Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2374 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5314/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2374 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5203/ --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2374#discussion_r196398250 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java --- @@ -291,6 +318,17 @@ public void write(DataOutput out) throws IOException { } } out.writeBoolean(isSchemaModified); + + out.writeUTF(format); + boolean isFormatPropertiesExists = null != formatProperties && formatProperties.size() > 0; + out.writeBoolean(isFormatPropertiesExists); + if (isFormatPropertiesExists) { + out.writeShort(formatProperties.size()); --- End diff -- The formatProperties is a javaMap that converted from scalaMap. When I try to directly write this map as binary, a serialization problem occurs. --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on the issue:
https://github.com/apache/carbondata/pull/2374 @jackylk All the comments has been resolved except https://github.com/apache/carbondata/pull/2374#discussion_r195684966 --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2374 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6387/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2374 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5222/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2374#discussion_r196508522 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/CsvRecordReader.java --- @@ -0,0 +1,510 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.io.UnsupportedEncodingException; +import java.math.BigDecimal; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.GenericQueryType; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.intf.RowImpl; +import org.apache.carbondata.core.scan.filter.intf.RowIntf; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.statusmanager.FileFormatProperties; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; + +import com.univocity.parsers.csv.CsvParser; +import com.univocity.parsers.csv.CsvParserSettings; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +/** + * scan csv file and filter on it + */ +@InterfaceStability.Evolving +@InterfaceAudience.Internal +public class CsvRecordReader<T> extends AbstractRecordReader<T> { + private static final LogService LOGGER = LogServiceFactory.getLogService( + CsvRecordReader.class.getName()); + private static final int MAX_BATCH_SIZE = + CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT; + // vector reader + private boolean isVectorReader; + private T columnarBatch; + + // metadata + private CarbonTable carbonTable; + private CarbonColumn[] carbonColumns; + // input + private QueryModel queryModel; + private CarbonReadSupport<T> readSupport; + private FileSplit fileSplit; + private Configuration hadoopConf; + // the index is schema ordinal, the value is the csv ordinal + private int[] schema2csvIdx; + + // filter + private FilterExecuter filter; + // the index is the dimension ordinal, the value is the schema ordinal + private int[] filterColumn2SchemaIdx; + private Object[] internalValues; + private RowIntf internalRow; + + // output + private CarbonColumn[] projection; + // the index is the projection column ordinal, the value is the schema ordinal + private int[] projectionColumn2SchemaIdx; + private Object[] outputValues; + private Object[][] batchOutputValues; + private T outputRow; + + // inputMetricsStats + private InputMetricsStats inputMetricsStats; + + // scan + private Reader reader; + private CsvParser csvParser; + + public CsvRecordReader(QueryModel queryModel, CarbonReadSupport<T> readSupport) { + this.queryModel = queryModel; + this.readSupport = readSupport; + } + + public CsvRecordReader(QueryModel queryModel, CarbonReadSupport readSupport, + InputMetricsStats inputMetricsStats) { + this(queryModel, readSupport); + this.inputMetricsStats = inputMetricsStats; + } + + public boolean isVectorReader() { + return isVectorReader; + } + + public void setVectorReader(boolean vectorReader) { + isVectorReader = vectorReader; + } + + public void setQueryModel(QueryModel queryModel) { + this.queryModel = queryModel; + } + + public void setInputMetricsStats(InputMetricsStats inputMetricsStats) { + this.inputMetricsStats = inputMetricsStats; + } + + public void setReadSupport(CarbonReadSupport<T> readSupport) { + this.readSupport = readSupport; + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + if (split instanceof CarbonInputSplit) { + fileSplit = (CarbonInputSplit) split; + } else if (split instanceof CarbonMultiBlockSplit) { + fileSplit = ((CarbonMultiBlockSplit) split).getAllSplits().get(0); + } else { + fileSplit = (FileSplit) split; + } + + hadoopConf = context.getConfiguration(); + if (queryModel == null) { + CarbonTableInputFormat inputFormat = new CarbonTableInputFormat<Object>(); + queryModel = inputFormat.createQueryModel(split, context); + } + + carbonTable = queryModel.getTable(); + + // since the sequence of csv header, schema, carbon internal row, projection are different, + // we need to init the column mappings + initializedIdxMapping(); + + // init filter + if (null != queryModel.getFilterExpressionResolverTree()) { + initializeFilter(); + } + + // init reading + initializeCsvReader(); + + this.readSupport.initialize(projection, carbonTable); + } + + private void initializedIdxMapping() { + carbonColumns = + carbonTable.getCreateOrderColumn(carbonTable.getTableName()).toArray(new CarbonColumn[0]); + // for schema to csv mapping + schema2csvIdx = new int[carbonColumns.length]; + if (!carbonTable.getTableInfo().getFormatProperties().containsKey( + FileFormatProperties.CSV.HEADER)) { + // if no header specified, it means that they are the same + LOGGER.info("no header specified, will take the schema from table as header"); + for (int i = 0; i < carbonColumns.length; i++) { + schema2csvIdx[i] = i; + } + } else { + String[] csvHeader = carbonTable.getTableInfo().getFormatProperties().get( + FileFormatProperties.CSV.HEADER).split(","); + for (int i = 0; i < csvHeader.length; i++) { + boolean found = false; + for (int j = 0; j < carbonColumns.length; j++) { + if (StringUtils.strip(csvHeader[i]).equalsIgnoreCase(carbonColumns[j].getColName())) { + schema2csvIdx[carbonColumns[j].getSchemaOrdinal()] = i; + found = true; + break; + } + } + if (!found) { + throw new RuntimeException( + String.format("Can not find csv header '%s' in table fields", csvHeader[i])); + } + } + } + + // for carbon internal row to schema mapping + filterColumn2SchemaIdx = new int[carbonColumns.length]; + int filterIdx = 0; + for (CarbonDimension dimension : carbonTable.getDimensions()) { + filterColumn2SchemaIdx[filterIdx++] = dimension.getSchemaOrdinal(); + } + for (CarbonMeasure measure : carbonTable.getMeasures()) { + filterColumn2SchemaIdx[filterIdx++] = measure.getSchemaOrdinal(); + } + + // for projects to schema mapping + projection = queryModel.getProjectionColumns(); + projectionColumn2SchemaIdx = new int[projection.length]; + + for (int i = 0; i < projection.length; i++) { + for (int j = 0; j < carbonColumns.length; j++) { + if (projection[i].getColName().equals(carbonColumns[j].getColName())) { + projectionColumn2SchemaIdx[i] = projection[i].getSchemaOrdinal(); + break; + } + } + } + } + + private void initializeFilter() { + List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil + .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()), + carbonTable.getMeasureByTableName(carbonTable.getTableName())); + int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()]; + for (int i = 0; i < dimLensWithComplex.length; i++) { + dimLensWithComplex[i] = Integer.MAX_VALUE; + } + + int[] dictionaryColumnCardinality = + CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList); + SegmentProperties segmentProperties = + new SegmentProperties(wrapperColumnSchemaList, dictionaryColumnCardinality); + Map<Integer, GenericQueryType> complexDimensionInfoMap = new HashMap<>(); + + FilterResolverIntf resolverIntf = queryModel.getFilterExpressionResolverTree(); + filter = FilterUtil.getFilterExecuterTree(resolverIntf, segmentProperties, + complexDimensionInfoMap); + // for row filter, we need update column index + FilterUtil.updateIndexOfColumnExpression(resolverIntf.getFilterExpression(), + carbonTable.getDimensionOrdinalMax()); + } + + private void initializeCsvReader() throws IOException { + internalValues = new Object[carbonColumns.length]; + internalRow = new RowImpl(); + internalRow.setValues(internalValues); + + outputValues = new Object[projection.length]; + batchOutputValues = new Object[MAX_BATCH_SIZE][projection.length]; + + Path file = fileSplit.getPath(); + FileSystem fs = file.getFileSystem(hadoopConf); + int bufferSize = Integer.parseInt( + hadoopConf.get(CSVInputFormat.READ_BUFFER_SIZE, CSVInputFormat.READ_BUFFER_SIZE_DEFAULT)); + // note that here we read the whole file, not a split of the file + FSDataInputStream fsStream = fs.open(file, bufferSize); + reader = new InputStreamReader(fsStream, CarbonCommonConstants.DEFAULT_CHARSET); + // use default csv settings first, then update it using user specified properties later + CsvParserSettings settings = CSVInputFormat.extractCsvParserSettings(hadoopConf); + initCsvSettings(settings); + csvParser = new CsvParser(settings); + csvParser.beginParsing(reader); + } + + /** + * update the settings using properties from user + */ + private void initCsvSettings(CsvParserSettings settings) { + Map<String, String> csvProperties = carbonTable.getTableInfo().getFormatProperties(); + + if (csvProperties.containsKey(FileFormatProperties.CSV.DELIMITER)) { + settings.getFormat().setDelimiter( + csvProperties.get(FileFormatProperties.CSV.DELIMITER).charAt(0)); + } + + if (csvProperties.containsKey(FileFormatProperties.CSV.COMMENT)) { + settings.getFormat().setComment( + csvProperties.get(FileFormatProperties.CSV.COMMENT).charAt(0)); + } + + if (csvProperties.containsKey(FileFormatProperties.CSV.QUOTE)) { + settings.getFormat().setQuote( + csvProperties.get(FileFormatProperties.CSV.QUOTE).charAt(0)); + } + + if (csvProperties.containsKey(FileFormatProperties.CSV.ESCAPE)) { + settings.getFormat().setQuoteEscape( + csvProperties.get(FileFormatProperties.CSV.ESCAPE).charAt(0)); + } + + if (csvProperties.containsKey(FileFormatProperties.CSV.SKIP_EMPTY_LINE)) { + settings.setSkipEmptyLines( + Boolean.parseBoolean(csvProperties.get(FileFormatProperties.CSV.SKIP_EMPTY_LINE))); + } + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (isVectorReader) { + return nextColumnarBatch(); + } + + return nextRow(); + } + + private boolean nextColumnarBatch() throws IOException { + return scanAndFillBatch(); + } + + private boolean scanAndFillBatch() throws IOException { + int rowNum = 0; + if (null == filter) { + while (readRowFromFile() && rowNum < MAX_BATCH_SIZE) { + System.arraycopy(outputValues, 0, batchOutputValues[rowNum++], 0, outputValues.length); + } + } else { + try { + while (readRowFromFile() && rowNum < MAX_BATCH_SIZE) { + if (filter.applyFilter(internalRow, carbonTable.getDimensionOrdinalMax())) { + System.arraycopy(outputValues, 0, batchOutputValues[rowNum++], 0, outputValues.length); + } + } + } catch (FilterUnsupportedException e) { + throw new IOException("Failed to filter row in CarbonCsvRecordReader", e); + } + } + if (rowNum < MAX_BATCH_SIZE) { + Object[][] tmpBatchOutputValues = new Object[rowNum][]; + for (int i = 0; i < rowNum; i++) { + tmpBatchOutputValues[i] = batchOutputValues[i]; + } + System.arraycopy(batchOutputValues, 0, tmpBatchOutputValues, 0, rowNum); + for (int i = 0; i < tmpBatchOutputValues.length; i++) { + } + columnarBatch = readSupport.readRow(tmpBatchOutputValues); + } else { + columnarBatch = readSupport.readRow(batchOutputValues); + } + return rowNum > 0; + } + + private boolean nextRow() throws IOException { + if (csvParser == null) { + return false; + } + + if (!readRowFromFile()) { + return false; + } + + if (null == filter) { + putRowToSparkRow(); + return true; + } else { + try { + boolean scanMore; + do { + scanMore = !filter.applyFilter(internalRow, carbonTable.getDimensionOrdinalMax()); + if (!scanMore) { + putRowToSparkRow(); + return true; + } + } while (readRowFromFile()); + // if we read the end of file and still need scanMore, it means that there is no row + return false; + } catch (FilterUnsupportedException e) { + throw new IOException("Failed to filter row in CarbonCsvRecordReader", e); + } + } + } + + private void putRowToSparkRow() { --- End diff -- method name is not correct and this method can be removed --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2374#discussion_r196509716 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/CsvRecordReader.java --- @@ -0,0 +1,510 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.io.UnsupportedEncodingException; +import java.math.BigDecimal; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.GenericQueryType; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.intf.RowImpl; +import org.apache.carbondata.core.scan.filter.intf.RowIntf; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.statusmanager.FileFormatProperties; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; + +import com.univocity.parsers.csv.CsvParser; +import com.univocity.parsers.csv.CsvParserSettings; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +/** + * scan csv file and filter on it + */ +@InterfaceStability.Evolving +@InterfaceAudience.Internal +public class CsvRecordReader<T> extends AbstractRecordReader<T> { --- End diff -- This class is much like StreamRecordReader, and it implements filter execution on internal row, can you extract common code to a parent class? --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2374#discussion_r196509935 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java --- @@ -174,9 +174,15 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO List<InputSplit> result = new LinkedList<InputSplit>(); // for each segment fetch blocks matching filter in Driver BTree - List<CarbonInputSplit> dataBlocksOfSegment = - getDataBlocksOfSegment(job, carbonTable, filterResolver, matchedPartitions, - validSegments, partitionInfo, oldPartitionIdList); + List<CarbonInputSplit> dataBlocksOfSegment; + if (carbonTable.getTableInfo().getFormat().equals("") --- End diff -- why support empty string? --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2374#discussion_r196510278 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java --- @@ -515,12 +574,73 @@ private CarbonInputSplit convertToCarbonInputSplit(ExtendedBlocklet blocklet) th return split; } + private List<CarbonInputSplit> convertToInputSplit4ExternalFormat(JobContext jobContext, + ExtendedBlocklet extendedBlocklet) throws IOException { + List<CarbonInputSplit> splits = new ArrayList<CarbonInputSplit>(); + String factFilePath = extendedBlocklet.getFilePath(); + Path path = new Path(factFilePath); + FileSystem fs = FileFactory.getFileSystem(path); + FileStatus fileStatus = fs.getFileStatus(path); + long length = fileStatus.getLen(); + if (length != 0) { + BlockLocation[] blkLocations = fs.getFileBlockLocations(path, 0, length); + long blkSize = fileStatus.getBlockSize(); + long minSplitSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(jobContext)); + long maxSplitSize = getMaxSplitSize(jobContext); + long splitSize = computeSplitSize(blkSize, minSplitSize, maxSplitSize); + long bytesRemaining = fileStatus.getLen(); + while (((double) bytesRemaining) / splitSize > 1.1) { + int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); + splits.add(new CarbonInputSplit(extendedBlocklet.getSegmentId(), path, + length - bytesRemaining, + splitSize, blkLocations[blkIndex].getHosts(), + blkLocations[blkIndex].getCachedHosts(), FileFormat.EXTERNAL)); + bytesRemaining -= splitSize; + } + if (bytesRemaining != 0) { + int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); + splits.add(new CarbonInputSplit(extendedBlocklet.getSegmentId(), path, + length - bytesRemaining, + bytesRemaining, blkLocations[blkIndex].getHosts(), + blkLocations[blkIndex].getCachedHosts(), FileFormat.EXTERNAL)); + } + } else { + splits.add(new CarbonInputSplit(extendedBlocklet.getSegmentId(), path, 0, length, + new String[0], FileFormat.EXTERNAL)); + } + return splits; + } + @Override public RecordReader<Void, T> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { Configuration configuration = taskAttemptContext.getConfiguration(); QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext); CarbonReadSupport<T> readSupport = getReadSupportClass(configuration); - return new CarbonRecordReader<T>(queryModel, readSupport); + if (inputSplit instanceof CarbonMultiBlockSplit + && ((CarbonMultiBlockSplit) inputSplit).getFileFormat() == FileFormat.EXTERNAL) { + return createRecordReaderForExternalFormat(queryModel, readSupport, + configuration.get(CarbonCommonConstants.CARBON_EXTERNAL_FORMAT_CONF_KEY)); + } else if (inputSplit instanceof CarbonInputSplit + && ((CarbonInputSplit) inputSplit).getFileFormat() == FileFormat.EXTERNAL) { + return createRecordReaderForExternalFormat(queryModel, readSupport, + configuration.get(CarbonCommonConstants.CARBON_EXTERNAL_FORMAT_CONF_KEY)); + } else { + return new CarbonRecordReader<T>(queryModel, readSupport); + } + } + + @Since("1.4.1") --- End diff -- I think for private method, this annotation is not required --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2374#discussion_r196510839 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala --- @@ -426,6 +439,22 @@ class CarbonScanRDD[T: ClassTag]( CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId)) streamReader.setQueryModel(model) streamReader + case FileFormat.EXTERNAL => + assert(storageFormat.equals("csv"), --- End diff -- should use if check instead of assert --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2374#discussion_r196511544 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala --- @@ -700,6 +700,13 @@ class TableNewProcessor(cm: TableModel) { cm.tableName)) tableInfo.setLastUpdatedTime(System.currentTimeMillis()) tableInfo.setFactTable(tableSchema) + val format = cm.tableProperties.get(CarbonCommonConstants.FORMAT) --- End diff -- `format` table property should also be checked, now only csv is supported --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2374#discussion_r196512126 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddSegmentCommand.scala --- @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.management + +import java.util.UUID + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.execution.command.AtomicRunnableCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.util.FileUtils + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.datamap.status.DataMapStatusManager +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.mutate.CarbonUpdateUtil +import org.apache.carbondata.core.statusmanager.{FileFormat, LoadMetadataDetails, SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.events.{OperationContext, OperationListenerBus} +import org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEvent +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} +import org.apache.carbondata.processing.util.CarbonLoaderUtil + +/** + * support `alter table tableName add segment location 'path'` command. + * It will create a segment and map the path of datafile to segment's storage + */ +case class CarbonAddSegmentCommand( + dbNameOp: Option[String], + tableName: String, + filePathFromUser: String, + var operationContext: OperationContext = new OperationContext) extends AtomicRunnableCommand { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + var carbonTable: CarbonTable = _ + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + val dbName = CarbonEnv.getDatabaseName(dbNameOp)(sparkSession) + carbonTable = { + val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore + .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] + if (relation == null) { + LOGGER.error(s"Add segment failed due to table $dbName.$tableName not found") + throw new NoSuchTableException(dbName, tableName) + } + relation.carbonTable + } + + if (carbonTable.isHivePartitionTable) { + LOGGER.error("Ignore hive partition table for now") + } + + operationContext.setProperty("isOverwrite", false) + if (CarbonUtil.hasAggregationDataMap(carbonTable)) { + val loadMetadataEvent = new LoadMetadataEvent(carbonTable, false) + OperationListenerBus.getInstance().fireEvent(loadMetadataEvent, operationContext) + } + Seq.empty + } + + // will just mapping external files to segment metadata + override def processData(sparkSession: SparkSession): Seq[Row] = { --- End diff -- All these operations are metadata only, so I think this class should extend `MetadataProcessOpeation` instead --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2374#discussion_r196512608 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala --- @@ -403,6 +403,17 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { partition = partitionSpec) } + /** + * The syntax of + * ALTER TABLE [dbName.]tableName ADD SEGMENT LOCATION 'path/to/data' + */ + protected lazy val addSegment: Parser[LogicalPlan] = + ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ + ADD ~ SEGMENT ~ LOCATION ~ stringLit <~ opt(";") ^^ { + case dbName ~ tableName ~ add ~ segment ~ location ~ filePath => --- End diff -- I think it should be `case dbName ~ tableName ~ filePath =>` --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2374 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5335/ --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2374#discussion_r196624366 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/CsvRecordReader.java --- @@ -0,0 +1,510 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.io.UnsupportedEncodingException; +import java.math.BigDecimal; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.GenericQueryType; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.intf.RowImpl; +import org.apache.carbondata.core.scan.filter.intf.RowIntf; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.statusmanager.FileFormatProperties; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; + +import com.univocity.parsers.csv.CsvParser; +import com.univocity.parsers.csv.CsvParserSettings; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +/** + * scan csv file and filter on it + */ +@InterfaceStability.Evolving +@InterfaceAudience.Internal +public class CsvRecordReader<T> extends AbstractRecordReader<T> { + private static final LogService LOGGER = LogServiceFactory.getLogService( + CsvRecordReader.class.getName()); + private static final int MAX_BATCH_SIZE = + CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT; + // vector reader + private boolean isVectorReader; + private T columnarBatch; + + // metadata + private CarbonTable carbonTable; + private CarbonColumn[] carbonColumns; + // input + private QueryModel queryModel; + private CarbonReadSupport<T> readSupport; + private FileSplit fileSplit; + private Configuration hadoopConf; + // the index is schema ordinal, the value is the csv ordinal + private int[] schema2csvIdx; + + // filter + private FilterExecuter filter; + // the index is the dimension ordinal, the value is the schema ordinal + private int[] filterColumn2SchemaIdx; + private Object[] internalValues; + private RowIntf internalRow; + + // output + private CarbonColumn[] projection; + // the index is the projection column ordinal, the value is the schema ordinal + private int[] projectionColumn2SchemaIdx; + private Object[] outputValues; + private Object[][] batchOutputValues; + private T outputRow; + + // inputMetricsStats + private InputMetricsStats inputMetricsStats; + + // scan + private Reader reader; + private CsvParser csvParser; + + public CsvRecordReader(QueryModel queryModel, CarbonReadSupport<T> readSupport) { + this.queryModel = queryModel; + this.readSupport = readSupport; + } + + public CsvRecordReader(QueryModel queryModel, CarbonReadSupport readSupport, + InputMetricsStats inputMetricsStats) { + this(queryModel, readSupport); + this.inputMetricsStats = inputMetricsStats; + } + + public boolean isVectorReader() { + return isVectorReader; + } + + public void setVectorReader(boolean vectorReader) { + isVectorReader = vectorReader; + } + + public void setQueryModel(QueryModel queryModel) { + this.queryModel = queryModel; + } + + public void setInputMetricsStats(InputMetricsStats inputMetricsStats) { + this.inputMetricsStats = inputMetricsStats; + } + + public void setReadSupport(CarbonReadSupport<T> readSupport) { + this.readSupport = readSupport; + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + if (split instanceof CarbonInputSplit) { + fileSplit = (CarbonInputSplit) split; + } else if (split instanceof CarbonMultiBlockSplit) { + fileSplit = ((CarbonMultiBlockSplit) split).getAllSplits().get(0); + } else { + fileSplit = (FileSplit) split; + } + + hadoopConf = context.getConfiguration(); + if (queryModel == null) { + CarbonTableInputFormat inputFormat = new CarbonTableInputFormat<Object>(); + queryModel = inputFormat.createQueryModel(split, context); + } + + carbonTable = queryModel.getTable(); + + // since the sequence of csv header, schema, carbon internal row, projection are different, + // we need to init the column mappings + initializedIdxMapping(); + + // init filter + if (null != queryModel.getFilterExpressionResolverTree()) { + initializeFilter(); + } + + // init reading + initializeCsvReader(); + + this.readSupport.initialize(projection, carbonTable); + } + + private void initializedIdxMapping() { + carbonColumns = + carbonTable.getCreateOrderColumn(carbonTable.getTableName()).toArray(new CarbonColumn[0]); + // for schema to csv mapping + schema2csvIdx = new int[carbonColumns.length]; + if (!carbonTable.getTableInfo().getFormatProperties().containsKey( + FileFormatProperties.CSV.HEADER)) { + // if no header specified, it means that they are the same + LOGGER.info("no header specified, will take the schema from table as header"); + for (int i = 0; i < carbonColumns.length; i++) { + schema2csvIdx[i] = i; + } + } else { + String[] csvHeader = carbonTable.getTableInfo().getFormatProperties().get( + FileFormatProperties.CSV.HEADER).split(","); + for (int i = 0; i < csvHeader.length; i++) { + boolean found = false; + for (int j = 0; j < carbonColumns.length; j++) { + if (StringUtils.strip(csvHeader[i]).equalsIgnoreCase(carbonColumns[j].getColName())) { + schema2csvIdx[carbonColumns[j].getSchemaOrdinal()] = i; + found = true; + break; + } + } + if (!found) { + throw new RuntimeException( + String.format("Can not find csv header '%s' in table fields", csvHeader[i])); + } + } + } + + // for carbon internal row to schema mapping + filterColumn2SchemaIdx = new int[carbonColumns.length]; + int filterIdx = 0; + for (CarbonDimension dimension : carbonTable.getDimensions()) { + filterColumn2SchemaIdx[filterIdx++] = dimension.getSchemaOrdinal(); + } + for (CarbonMeasure measure : carbonTable.getMeasures()) { + filterColumn2SchemaIdx[filterIdx++] = measure.getSchemaOrdinal(); + } + + // for projects to schema mapping + projection = queryModel.getProjectionColumns(); + projectionColumn2SchemaIdx = new int[projection.length]; + + for (int i = 0; i < projection.length; i++) { + for (int j = 0; j < carbonColumns.length; j++) { + if (projection[i].getColName().equals(carbonColumns[j].getColName())) { + projectionColumn2SchemaIdx[i] = projection[i].getSchemaOrdinal(); + break; + } + } + } + } + + private void initializeFilter() { + List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil + .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()), + carbonTable.getMeasureByTableName(carbonTable.getTableName())); + int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()]; + for (int i = 0; i < dimLensWithComplex.length; i++) { + dimLensWithComplex[i] = Integer.MAX_VALUE; + } + + int[] dictionaryColumnCardinality = + CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList); + SegmentProperties segmentProperties = + new SegmentProperties(wrapperColumnSchemaList, dictionaryColumnCardinality); + Map<Integer, GenericQueryType> complexDimensionInfoMap = new HashMap<>(); + + FilterResolverIntf resolverIntf = queryModel.getFilterExpressionResolverTree(); + filter = FilterUtil.getFilterExecuterTree(resolverIntf, segmentProperties, + complexDimensionInfoMap); + // for row filter, we need update column index + FilterUtil.updateIndexOfColumnExpression(resolverIntf.getFilterExpression(), + carbonTable.getDimensionOrdinalMax()); + } + + private void initializeCsvReader() throws IOException { + internalValues = new Object[carbonColumns.length]; + internalRow = new RowImpl(); + internalRow.setValues(internalValues); + + outputValues = new Object[projection.length]; + batchOutputValues = new Object[MAX_BATCH_SIZE][projection.length]; + + Path file = fileSplit.getPath(); + FileSystem fs = file.getFileSystem(hadoopConf); + int bufferSize = Integer.parseInt( + hadoopConf.get(CSVInputFormat.READ_BUFFER_SIZE, CSVInputFormat.READ_BUFFER_SIZE_DEFAULT)); + // note that here we read the whole file, not a split of the file + FSDataInputStream fsStream = fs.open(file, bufferSize); + reader = new InputStreamReader(fsStream, CarbonCommonConstants.DEFAULT_CHARSET); + // use default csv settings first, then update it using user specified properties later + CsvParserSettings settings = CSVInputFormat.extractCsvParserSettings(hadoopConf); + initCsvSettings(settings); + csvParser = new CsvParser(settings); + csvParser.beginParsing(reader); + } + + /** + * update the settings using properties from user + */ + private void initCsvSettings(CsvParserSettings settings) { + Map<String, String> csvProperties = carbonTable.getTableInfo().getFormatProperties(); + + if (csvProperties.containsKey(FileFormatProperties.CSV.DELIMITER)) { + settings.getFormat().setDelimiter( + csvProperties.get(FileFormatProperties.CSV.DELIMITER).charAt(0)); + } + + if (csvProperties.containsKey(FileFormatProperties.CSV.COMMENT)) { + settings.getFormat().setComment( + csvProperties.get(FileFormatProperties.CSV.COMMENT).charAt(0)); + } + + if (csvProperties.containsKey(FileFormatProperties.CSV.QUOTE)) { + settings.getFormat().setQuote( + csvProperties.get(FileFormatProperties.CSV.QUOTE).charAt(0)); + } + + if (csvProperties.containsKey(FileFormatProperties.CSV.ESCAPE)) { + settings.getFormat().setQuoteEscape( + csvProperties.get(FileFormatProperties.CSV.ESCAPE).charAt(0)); + } + + if (csvProperties.containsKey(FileFormatProperties.CSV.SKIP_EMPTY_LINE)) { + settings.setSkipEmptyLines( + Boolean.parseBoolean(csvProperties.get(FileFormatProperties.CSV.SKIP_EMPTY_LINE))); + } + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (isVectorReader) { + return nextColumnarBatch(); + } + + return nextRow(); + } + + private boolean nextColumnarBatch() throws IOException { + return scanAndFillBatch(); + } + + private boolean scanAndFillBatch() throws IOException { + int rowNum = 0; + if (null == filter) { + while (readRowFromFile() && rowNum < MAX_BATCH_SIZE) { + System.arraycopy(outputValues, 0, batchOutputValues[rowNum++], 0, outputValues.length); + } + } else { + try { + while (readRowFromFile() && rowNum < MAX_BATCH_SIZE) { + if (filter.applyFilter(internalRow, carbonTable.getDimensionOrdinalMax())) { + System.arraycopy(outputValues, 0, batchOutputValues[rowNum++], 0, outputValues.length); + } + } + } catch (FilterUnsupportedException e) { + throw new IOException("Failed to filter row in CarbonCsvRecordReader", e); + } + } + if (rowNum < MAX_BATCH_SIZE) { + Object[][] tmpBatchOutputValues = new Object[rowNum][]; + for (int i = 0; i < rowNum; i++) { + tmpBatchOutputValues[i] = batchOutputValues[i]; + } + System.arraycopy(batchOutputValues, 0, tmpBatchOutputValues, 0, rowNum); + for (int i = 0; i < tmpBatchOutputValues.length; i++) { + } + columnarBatch = readSupport.readRow(tmpBatchOutputValues); + } else { + columnarBatch = readSupport.readRow(batchOutputValues); + } + return rowNum > 0; + } + + private boolean nextRow() throws IOException { + if (csvParser == null) { + return false; + } + + if (!readRowFromFile()) { + return false; + } + + if (null == filter) { + putRowToSparkRow(); + return true; + } else { + try { + boolean scanMore; + do { + scanMore = !filter.applyFilter(internalRow, carbonTable.getDimensionOrdinalMax()); + if (!scanMore) { + putRowToSparkRow(); + return true; + } + } while (readRowFromFile()); + // if we read the end of file and still need scanMore, it means that there is no row + return false; + } catch (FilterUnsupportedException e) { + throw new IOException("Failed to filter row in CarbonCsvRecordReader", e); + } + } + } + + private void putRowToSparkRow() { --- End diff -- OK~ --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2374#discussion_r196625258 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/CsvRecordReader.java --- @@ -0,0 +1,510 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.io.UnsupportedEncodingException; +import java.math.BigDecimal; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.GenericQueryType; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.intf.RowImpl; +import org.apache.carbondata.core.scan.filter.intf.RowIntf; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.statusmanager.FileFormatProperties; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; + +import com.univocity.parsers.csv.CsvParser; +import com.univocity.parsers.csv.CsvParserSettings; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +/** + * scan csv file and filter on it + */ +@InterfaceStability.Evolving +@InterfaceAudience.Internal +public class CsvRecordReader<T> extends AbstractRecordReader<T> { --- End diff -- The procedure is alike, but the implementation is quite different. The most import parts are converting origin data to internal row and converting origin data to output row. StreamRecordReader, its origin source is ROW_V1 format while in CsvRecordReader, its origin source is CSV format. Besides, in StreamRecordReader there are more details, such as 'syncMark', 'rawRow', we do not need it in CSV. Maybe we can extract the common code in utils or create a new abstraction for ReadSupport or RecordReader. --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2374#discussion_r196625604 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java --- @@ -174,9 +174,15 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO List<InputSplit> result = new LinkedList<InputSplit>(); // for each segment fetch blocks matching filter in Driver BTree - List<CarbonInputSplit> dataBlocksOfSegment = - getDataBlocksOfSegment(job, carbonTable, filterResolver, matchedPartitions, - validSegments, partitionInfo, oldPartitionIdList); + List<CarbonInputSplit> dataBlocksOfSegment; + if (carbonTable.getTableInfo().getFormat().equals("") --- End diff -- The default value of format is 'carbondata', so there is no need to handle empty. Will remove it --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2374#discussion_r196625677 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java --- @@ -515,12 +574,73 @@ private CarbonInputSplit convertToCarbonInputSplit(ExtendedBlocklet blocklet) th return split; } + private List<CarbonInputSplit> convertToInputSplit4ExternalFormat(JobContext jobContext, + ExtendedBlocklet extendedBlocklet) throws IOException { + List<CarbonInputSplit> splits = new ArrayList<CarbonInputSplit>(); + String factFilePath = extendedBlocklet.getFilePath(); + Path path = new Path(factFilePath); + FileSystem fs = FileFactory.getFileSystem(path); + FileStatus fileStatus = fs.getFileStatus(path); + long length = fileStatus.getLen(); + if (length != 0) { + BlockLocation[] blkLocations = fs.getFileBlockLocations(path, 0, length); + long blkSize = fileStatus.getBlockSize(); + long minSplitSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(jobContext)); + long maxSplitSize = getMaxSplitSize(jobContext); + long splitSize = computeSplitSize(blkSize, minSplitSize, maxSplitSize); + long bytesRemaining = fileStatus.getLen(); + while (((double) bytesRemaining) / splitSize > 1.1) { + int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); + splits.add(new CarbonInputSplit(extendedBlocklet.getSegmentId(), path, + length - bytesRemaining, + splitSize, blkLocations[blkIndex].getHosts(), + blkLocations[blkIndex].getCachedHosts(), FileFormat.EXTERNAL)); + bytesRemaining -= splitSize; + } + if (bytesRemaining != 0) { + int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); + splits.add(new CarbonInputSplit(extendedBlocklet.getSegmentId(), path, + length - bytesRemaining, + bytesRemaining, blkLocations[blkIndex].getHosts(), + blkLocations[blkIndex].getCachedHosts(), FileFormat.EXTERNAL)); + } + } else { + splits.add(new CarbonInputSplit(extendedBlocklet.getSegmentId(), path, 0, length, + new String[0], FileFormat.EXTERNAL)); + } + return splits; + } + @Override public RecordReader<Void, T> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { Configuration configuration = taskAttemptContext.getConfiguration(); QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext); CarbonReadSupport<T> readSupport = getReadSupportClass(configuration); - return new CarbonRecordReader<T>(queryModel, readSupport); + if (inputSplit instanceof CarbonMultiBlockSplit + && ((CarbonMultiBlockSplit) inputSplit).getFileFormat() == FileFormat.EXTERNAL) { + return createRecordReaderForExternalFormat(queryModel, readSupport, + configuration.get(CarbonCommonConstants.CARBON_EXTERNAL_FORMAT_CONF_KEY)); + } else if (inputSplit instanceof CarbonInputSplit + && ((CarbonInputSplit) inputSplit).getFileFormat() == FileFormat.EXTERNAL) { + return createRecordReaderForExternalFormat(queryModel, readSupport, + configuration.get(CarbonCommonConstants.CARBON_EXTERNAL_FORMAT_CONF_KEY)); + } else { + return new CarbonRecordReader<T>(queryModel, readSupport); + } + } + + @Since("1.4.1") --- End diff -- OK --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2374#discussion_r196626156 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala --- @@ -426,6 +439,22 @@ class CarbonScanRDD[T: ClassTag]( CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId)) streamReader.setQueryModel(model) streamReader + case FileFormat.EXTERNAL => + assert(storageFormat.equals("csv"), --- End diff -- OK~ --- |
Free forum by Nabble | Edit this page |