[GitHub] carbondata pull request #2273: [CARBONDATA-2442] Reading two sdk writer outp...

classic Classic list List threaded Threaded
45 messages Options
123
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2273: [CARBONDATA-2442] Fixed: Reading two sdk writ...

qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2273#discussion_r186995311
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---
    @@ -2311,14 +2312,14 @@ static DataType thriftDataTyopeToWrapperDataType(
         }
       }
     
    -  public static List<String> getFilePathExternalFilePath(String path) {
    +  public static List<String> getFilePathExternalFilePath(String path, final String fileExtension) {
    --- End diff --
   
    This change is not required


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2273: [CARBONDATA-2442] Fixed: Reading two sdk writ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2273#discussion_r187007180
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---
    @@ -151,6 +154,33 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO
         SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
         SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = segmentStatusManager
             .getValidAndInvalidSegments(loadMetadataDetails, this.readCommittedScope);
    +
    +    // For NonTransactional table, compare the schema of all index files with inferred schema.
    +    // If there is a mismatch throw exception. As all files must be of same schema.
    +    if (!carbonTable.getTableInfo().isTransactionalTable()) {
    +      SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
    +      for (Segment segment : segments.getValidSegments()) {
    +        Map<String, String> indexFiles = segment.getCommittedIndexFile();
    +        for (Map.Entry<String, String> indexFileEntry : indexFiles.entrySet()) {
    +          Path indexFile = new Path(indexFileEntry.getKey());
    +          org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil.inferSchemaFromIndexFile(
    +              indexFile.toString(), carbonTable.getTableName());
    +          TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
    +              tableInfo, identifier.getDatabaseName(),
    +              identifier.getTableName(),
    +              identifier.getTablePath());
    +          List<ColumnSchema> indexFileColumnList =
    +              wrapperTableInfo.getFactTable().getListOfColumns();
    +          List<ColumnSchema> tableColumnList =
    +              carbonTable.getTableInfo().getFactTable().getListOfColumns();
    +          if (!compareColumnSchemaList(indexFileColumnList, tableColumnList)) {
    +            throw new IOException("All the files schema doesn't match. "
    --- End diff --
   
    @kunal642 : For nonTransactional tables, we support many sdk writers output files to be placed and read from same folder. This works when schema is same, If schema is different we have to inform user that these files are not of same type. If we just ignore fiels how user know why it is ignored ? hence the exception


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2273: [CARBONDATA-2442] Fixed: Reading two sdk writ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2273#discussion_r187030343
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---
    @@ -151,6 +154,33 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO
         SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
         SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = segmentStatusManager
             .getValidAndInvalidSegments(loadMetadataDetails, this.readCommittedScope);
    +
    +    // For NonTransactional table, compare the schema of all index files with inferred schema.
    +    // If there is a mismatch throw exception. As all files must be of same schema.
    +    if (!carbonTable.getTableInfo().isTransactionalTable()) {
    +      SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
    +      for (Segment segment : segments.getValidSegments()) {
    +        Map<String, String> indexFiles = segment.getCommittedIndexFile();
    +        for (Map.Entry<String, String> indexFileEntry : indexFiles.entrySet()) {
    +          Path indexFile = new Path(indexFileEntry.getKey());
    +          org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil.inferSchemaFromIndexFile(
    +              indexFile.toString(), carbonTable.getTableName());
    +          TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
    +              tableInfo, identifier.getDatabaseName(),
    +              identifier.getTableName(),
    +              identifier.getTablePath());
    +          List<ColumnSchema> indexFileColumnList =
    +              wrapperTableInfo.getFactTable().getListOfColumns();
    +          List<ColumnSchema> tableColumnList =
    +              carbonTable.getTableInfo().getFactTable().getListOfColumns();
    +          if (!compareColumnSchemaList(indexFileColumnList, tableColumnList)) {
    +            throw new IOException("All the files schema doesn't match. "
    --- End diff --
   
    @kunal642 I agree with you. The purpose of SDK is to read whatever file is present. In case there is a mismatch in the schema we should not block the output of the files are having correct schema.
    Also, in future we are going to support Merge Schema and show the output in case of different schema.  
    Better to show the output of how much can be read with the correct schema and also throw a warning or print the log for the presence of different schema in the log.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2273: [CARBONDATA-2442] Fixed: Reading two sdk writ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user gvramana commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2273#discussion_r187071062
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---
    @@ -151,6 +154,33 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO
         SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
         SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = segmentStatusManager
             .getValidAndInvalidSegments(loadMetadataDetails, this.readCommittedScope);
    +
    +    // For NonTransactional table, compare the schema of all index files with inferred schema.
    +    // If there is a mismatch throw exception. As all files must be of same schema.
    +    if (!carbonTable.getTableInfo().isTransactionalTable()) {
    +      SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
    +      for (Segment segment : segments.getValidSegments()) {
    +        Map<String, String> indexFiles = segment.getCommittedIndexFile();
    +        for (Map.Entry<String, String> indexFileEntry : indexFiles.entrySet()) {
    +          Path indexFile = new Path(indexFileEntry.getKey());
    +          org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil.inferSchemaFromIndexFile(
    +              indexFile.toString(), carbonTable.getTableName());
    +          TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
    +              tableInfo, identifier.getDatabaseName(),
    +              identifier.getTableName(),
    +              identifier.getTablePath());
    +          List<ColumnSchema> indexFileColumnList =
    +              wrapperTableInfo.getFactTable().getListOfColumns();
    +          List<ColumnSchema> tableColumnList =
    +              carbonTable.getTableInfo().getFactTable().getListOfColumns();
    +          if (!compareColumnSchemaList(indexFileColumnList, tableColumnList)) {
    +            throw new IOException("All the files schema doesn't match. "
    --- End diff --
   
    @kunal642 , @sounakr. Data files should not be skipped, clear error should be given to user. Otherwise user thinks that result is correct and is computed considering all files. Along with exception, which file has data mismatch also needs to be logged for him to analyse further and fix.
    later carbon print tool will be provided for him to check schema of each carbondata file, which will help user to debug problem.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2273: [CARBONDATA-2442] Fixed: Reading two sdk writ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2273#discussion_r187116998
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---
    @@ -151,6 +154,33 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO
         SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
         SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = segmentStatusManager
             .getValidAndInvalidSegments(loadMetadataDetails, this.readCommittedScope);
    +
    +    // For NonTransactional table, compare the schema of all index files with inferred schema.
    +    // If there is a mismatch throw exception. As all files must be of same schema.
    +    if (!carbonTable.getTableInfo().isTransactionalTable()) {
    +      SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
    +      for (Segment segment : segments.getValidSegments()) {
    +        Map<String, String> indexFiles = segment.getCommittedIndexFile();
    +        for (Map.Entry<String, String> indexFileEntry : indexFiles.entrySet()) {
    +          Path indexFile = new Path(indexFileEntry.getKey());
    +          org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil.inferSchemaFromIndexFile(
    +              indexFile.toString(), carbonTable.getTableName());
    +          TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
    +              tableInfo, identifier.getDatabaseName(),
    +              identifier.getTableName(),
    +              identifier.getTablePath());
    +          List<ColumnSchema> indexFileColumnList =
    +              wrapperTableInfo.getFactTable().getListOfColumns();
    +          List<ColumnSchema> tableColumnList =
    +              carbonTable.getTableInfo().getFactTable().getListOfColumns();
    +          if (!compareColumnSchemaList(indexFileColumnList, tableColumnList)) {
    +            throw new IOException("All the files schema doesn't match. "
    --- End diff --
   
    @kunal642 @sounakr I agree with @gvramana,  skipping data file is not correct as it will miss some records which will not be acceptable. Blocking user while writing is not possible. I think throwing exception is correct.
    @ajantha-bhat Can u please check how Parquet works in similar scenario.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2273: [CARBONDATA-2442] Fixed: Reading two sdk writer outp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on the issue:

    https://github.com/apache/carbondata/pull/2273
 
    retest this please


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2273: [CARBONDATA-2442] Fixed: Reading two sdk writ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2273#discussion_r187136017
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---
    @@ -151,6 +154,33 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO
         SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
         SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = segmentStatusManager
             .getValidAndInvalidSegments(loadMetadataDetails, this.readCommittedScope);
    +
    +    // For NonTransactional table, compare the schema of all index files with inferred schema.
    +    // If there is a mismatch throw exception. As all files must be of same schema.
    +    if (!carbonTable.getTableInfo().isTransactionalTable()) {
    +      SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
    +      for (Segment segment : segments.getValidSegments()) {
    +        Map<String, String> indexFiles = segment.getCommittedIndexFile();
    +        for (Map.Entry<String, String> indexFileEntry : indexFiles.entrySet()) {
    +          Path indexFile = new Path(indexFileEntry.getKey());
    +          org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil.inferSchemaFromIndexFile(
    +              indexFile.toString(), carbonTable.getTableName());
    +          TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
    +              tableInfo, identifier.getDatabaseName(),
    +              identifier.getTableName(),
    +              identifier.getTablePath());
    +          List<ColumnSchema> indexFileColumnList =
    +              wrapperTableInfo.getFactTable().getListOfColumns();
    +          List<ColumnSchema> tableColumnList =
    +              carbonTable.getTableInfo().getFactTable().getListOfColumns();
    +          if (!compareColumnSchemaList(indexFileColumnList, tableColumnList)) {
    +            throw new IOException("All the files schema doesn't match. "
    --- End diff --
   
    @kumarvishal09 :Tested with parquet by having 2 files with same column name but different data type. parquet throws java.lang.UnsupportedOperationException during read.
   
    Caused by: java.lang.UnsupportedOperationException: Unimplemented type: StringType
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readIntBatch(VectorizedColumnReader.java:369)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:188)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:230)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:137)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
   
   



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2273: [CARBONDATA-2442] Fixed: Reading two sdk writer outp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2273
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5782/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2273: [CARBONDATA-2442] Fixed: Reading two sdk writer outp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2273
 
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4626/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2273: [CARBONDATA-2442] Fixed: Reading two sdk writer outp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2273
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4835/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2273: [CARBONDATA-2442] Fixed: Reading two sdk writer outp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on the issue:

    https://github.com/apache/carbondata/pull/2273
 
    LGTM



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2273: [CARBONDATA-2442] Fixed: Reading two sdk writer outp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on the issue:

    https://github.com/apache/carbondata/pull/2273
 
    @ajantha-bhat Please rebase


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2273: [CARBONDATA-2442] and [CARBONDATA-2469] Fixed: Readi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on the issue:

    https://github.com/apache/carbondata/pull/2273
 
    retest this please


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2273: [CARBONDATA-2442] and [CARBONDATA-2469] Fixed: Readi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on the issue:

    https://github.com/apache/carbondata/pull/2273
 
    retest this please


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2273: [CARBONDATA-2442] and [CARBONDATA-2469] Fixed: multi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2273
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5811/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2273: [CARBONDATA-2442] and [CARBONDATA-2469] Fixed: multi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2273
 
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4654/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2273: [CARBONDATA-2442] and [CARBONDATA-2469] Fixed...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2273#discussion_r187397546
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java ---
    @@ -122,6 +122,11 @@ private String getSegmentID(String carbonIndexFileName, String indexFilePath) {
       @Override public void takeCarbonIndexFileSnapShot() throws IOException {
         // Read the current file Path get the list of indexes from the path.
         CarbonFile file = FileFactory.getCarbonFile(carbonFilePath);
    +    if (file == null) {
    +      // For nonTransactional table, files can be removed at any point of time.
    +      // So cannot assume files will be present
    +      throw new IOException("No files are present in the table location");
    --- End diff --
   
    append File path in exception message


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2273: [CARBONDATA-2442] and [CARBONDATA-2469] Fixed: multi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on the issue:

    https://github.com/apache/carbondata/pull/2273
 
    LGTM


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2273: [CARBONDATA-2442] and [CARBONDATA-2469] Fixed: multi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on the issue:

    https://github.com/apache/carbondata/pull/2273
 
    LGTM


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2273: [CARBONDATA-2442] and [CARBONDATA-2469] Fixed...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user asfgit closed the pull request at:

    https://github.com/apache/carbondata/pull/2273


---
123