GitHub user QiangCai opened a pull request:
https://github.com/apache/incubator-carbondata/pull/528 Fix InsertInto test case for spark2 Changes: 1. move insertInto test case to spark-common-test module from spark module 2. add test case: insert into carbon table from carbon table union query 3. CarbonDecoderOptimizerHelper support InsertIntoTable for spark2 4. CreateTable and CarbonRelation use origin ordinal of columns for spark2 5. Optimize CSVInput for InsertInto to avoid to allocate too much memory at once. Impaction: 1. dataloading 2. query You can merge this pull request into a Git repository by running: $ git pull https://github.com/QiangCai/incubator-carbondata fixInsertIntoFromUnionQuery Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-carbondata/pull/528.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #528 ---- commit 96e94fb967936192520f64dd8404e148c7e5fad2 Author: QiangCai <[hidden email]> Date: 2017-01-12T17:26:30Z fix InsertInto issue for spark2 ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/incubator-carbondata/pull/528 Build Failed with Spark 1.6.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/569/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/incubator-carbondata/pull/528 Build Success with Spark 1.6.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/579/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/528#discussion_r95944775 --- Diff: processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java --- @@ -193,28 +195,26 @@ public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws K } class RddScanCallable implements Callable<Void> { - List<JavaRddIterator<String[]>> iterList; - - RddScanCallable() { - this.iterList = new ArrayList<JavaRddIterator<String[]>>(1000); - } - - public void addJavaRddIterator(JavaRddIterator<String[]> iter) { - this.iterList.add(iter); - } - @Override public Void call() throws Exception { StandardLogService.setThreadName(("PROCESS_DataFrame_PARTITIONS"), Thread.currentThread().getName()); try { String[] values = null; - for (JavaRddIterator<String[]> iter: iterList) { - iter.initialize(); - while (iter.hasNext()) { - values = iter.next(); - synchronized (putRowLock) { - putRow(data.outputRowMeta, values); + boolean hasNext = true; + JavaRddIterator<String[]> iter; + boolean isInitialized = false; + while(hasNext) { --- End diff -- please add space after while --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/528#discussion_r95956671 --- Diff: integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala --- @@ -237,9 +237,15 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation]) val leftCondAttrs = new util.HashSet[AttributeReferenceWrapper] val rightCondAttrs = new util.HashSet[AttributeReferenceWrapper] union.left.output.foreach(attr => --- End diff -- use `{` instead of `(` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/528#discussion_r95956802 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala --- @@ -782,7 +781,49 @@ case class CarbonRelation( nullable = true)()) } - override val output = dimensionsAttr ++ measureAttr + override val output = { + val columns = tableMeta.carbonTable.getCreateOrderColumn(tableMeta.carbonTable.getFactTableName) + .asScala + columns.filter(!_.isInvisible).map { column => + if (column.isDimesion()) { + val output: DataType = column.getDataType.toString.toLowerCase match { + case "array" => + CarbonMetastoreTypes.toDataType(s"array<${getArrayChildren(column.getColName)}>") + case "struct" => + CarbonMetastoreTypes.toDataType(s"struct<${getStructChildren(column.getColName)}>") + case dType => + val dataType = addDecimalScaleAndPrecision(column, dType) + CarbonMetastoreTypes.toDataType(dataType) + } + AttributeReference(column.getColName, output, + nullable = true + )(qualifier = Option(tableName + "." + column.getColName)) --- End diff -- move to previous line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/528#discussion_r95956880 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala --- @@ -782,7 +781,49 @@ case class CarbonRelation( nullable = true)()) } - override val output = dimensionsAttr ++ measureAttr + override val output = { + val columns = tableMeta.carbonTable.getCreateOrderColumn(tableMeta.carbonTable.getFactTableName) + .asScala + columns.filter(!_.isInvisible).map { column => + if (column.isDimesion()) { + val output: DataType = column.getDataType.toString.toLowerCase match { + case "array" => + CarbonMetastoreTypes.toDataType(s"array<${getArrayChildren(column.getColName)}>") + case "struct" => + CarbonMetastoreTypes.toDataType(s"struct<${getStructChildren(column.getColName)}>") + case dType => + val dataType = addDecimalScaleAndPrecision(column, dType) + CarbonMetastoreTypes.toDataType(dataType) + } + AttributeReference(column.getColName, output, + nullable = true + )(qualifier = Option(tableName + "." + column.getColName)) + } else { + AttributeReference(column.getColName, CarbonMetastoreTypes.toDataType( --- End diff -- move `CarbonMetastoreTypes.toDataType` to previous line to make it more readable --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/528#discussion_r95956920 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala --- @@ -782,7 +781,49 @@ case class CarbonRelation( nullable = true)()) } - override val output = dimensionsAttr ++ measureAttr + override val output = { + val columns = tableMeta.carbonTable.getCreateOrderColumn(tableMeta.carbonTable.getFactTableName) + .asScala + columns.filter(!_.isInvisible).map { column => + if (column.isDimesion()) { + val output: DataType = column.getDataType.toString.toLowerCase match { + case "array" => + CarbonMetastoreTypes.toDataType(s"array<${getArrayChildren(column.getColName)}>") + case "struct" => + CarbonMetastoreTypes.toDataType(s"struct<${getStructChildren(column.getColName)}>") + case dType => + val dataType = addDecimalScaleAndPrecision(column, dType) + CarbonMetastoreTypes.toDataType(dataType) + } + AttributeReference(column.getColName, output, + nullable = true + )(qualifier = Option(tableName + "." + column.getColName)) + } else { + AttributeReference(column.getColName, CarbonMetastoreTypes.toDataType( + column.getDataType.toString + .toLowerCase match { + case "int" => "long" + case "short" => "long" + case "decimal" => "decimal(" + column.getColumnSchema.getPrecision + "," + column + .getColumnSchema.getScale + ")" + case others => others + } + ), + nullable = true + )(qualifier = Option(tableName + "." + column.getColName)) + } + } + } + + def addDecimalScaleAndPrecision(dimval: CarbonColumn, dataType: String): String = { + var dType = dataType + if (dimval.getDataType + == org.apache.carbondata.core.carbon.metadata.datatype.DataType.DECIMAL) { --- End diff -- remove the explicit package name --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/528#discussion_r95973866 --- Diff: processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java --- @@ -193,28 +195,26 @@ public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws K } class RddScanCallable implements Callable<Void> { - List<JavaRddIterator<String[]>> iterList; - - RddScanCallable() { - this.iterList = new ArrayList<JavaRddIterator<String[]>>(1000); - } - - public void addJavaRddIterator(JavaRddIterator<String[]> iter) { - this.iterList.add(iter); - } - @Override public Void call() throws Exception { StandardLogService.setThreadName(("PROCESS_DataFrame_PARTITIONS"), Thread.currentThread().getName()); try { String[] values = null; - for (JavaRddIterator<String[]> iter: iterList) { - iter.initialize(); - while (iter.hasNext()) { - values = iter.next(); - synchronized (putRowLock) { - putRow(data.outputRowMeta, values); + boolean hasNext = true; + JavaRddIterator<String[]> iter; + boolean isInitialized = false; + while(hasNext) { --- End diff -- fixed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/528#discussion_r95973885 --- Diff: integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala --- @@ -237,9 +237,15 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation]) val leftCondAttrs = new util.HashSet[AttributeReferenceWrapper] val rightCondAttrs = new util.HashSet[AttributeReferenceWrapper] union.left.output.foreach(attr => --- End diff -- fixed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/528#discussion_r95973907 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala --- @@ -782,7 +781,49 @@ case class CarbonRelation( nullable = true)()) } - override val output = dimensionsAttr ++ measureAttr + override val output = { + val columns = tableMeta.carbonTable.getCreateOrderColumn(tableMeta.carbonTable.getFactTableName) + .asScala + columns.filter(!_.isInvisible).map { column => + if (column.isDimesion()) { + val output: DataType = column.getDataType.toString.toLowerCase match { + case "array" => + CarbonMetastoreTypes.toDataType(s"array<${getArrayChildren(column.getColName)}>") + case "struct" => + CarbonMetastoreTypes.toDataType(s"struct<${getStructChildren(column.getColName)}>") + case dType => + val dataType = addDecimalScaleAndPrecision(column, dType) + CarbonMetastoreTypes.toDataType(dataType) + } + AttributeReference(column.getColName, output, + nullable = true + )(qualifier = Option(tableName + "." + column.getColName)) --- End diff -- fixed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/528#discussion_r95973928 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala --- @@ -782,7 +781,49 @@ case class CarbonRelation( nullable = true)()) } - override val output = dimensionsAttr ++ measureAttr + override val output = { + val columns = tableMeta.carbonTable.getCreateOrderColumn(tableMeta.carbonTable.getFactTableName) + .asScala + columns.filter(!_.isInvisible).map { column => + if (column.isDimesion()) { + val output: DataType = column.getDataType.toString.toLowerCase match { + case "array" => + CarbonMetastoreTypes.toDataType(s"array<${getArrayChildren(column.getColName)}>") + case "struct" => + CarbonMetastoreTypes.toDataType(s"struct<${getStructChildren(column.getColName)}>") + case dType => + val dataType = addDecimalScaleAndPrecision(column, dType) + CarbonMetastoreTypes.toDataType(dataType) + } + AttributeReference(column.getColName, output, + nullable = true + )(qualifier = Option(tableName + "." + column.getColName)) + } else { + AttributeReference(column.getColName, CarbonMetastoreTypes.toDataType( --- End diff -- fixed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/528#discussion_r95976060 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala --- @@ -782,7 +781,49 @@ case class CarbonRelation( nullable = true)()) } - override val output = dimensionsAttr ++ measureAttr + override val output = { + val columns = tableMeta.carbonTable.getCreateOrderColumn(tableMeta.carbonTable.getFactTableName) + .asScala + columns.filter(!_.isInvisible).map { column => + if (column.isDimesion()) { + val output: DataType = column.getDataType.toString.toLowerCase match { + case "array" => + CarbonMetastoreTypes.toDataType(s"array<${getArrayChildren(column.getColName)}>") + case "struct" => + CarbonMetastoreTypes.toDataType(s"struct<${getStructChildren(column.getColName)}>") + case dType => + val dataType = addDecimalScaleAndPrecision(column, dType) + CarbonMetastoreTypes.toDataType(dataType) + } + AttributeReference(column.getColName, output, + nullable = true + )(qualifier = Option(tableName + "." + column.getColName)) + } else { + AttributeReference(column.getColName, CarbonMetastoreTypes.toDataType( + column.getDataType.toString + .toLowerCase match { + case "int" => "long" + case "short" => "long" + case "decimal" => "decimal(" + column.getColumnSchema.getPrecision + "," + column + .getColumnSchema.getScale + ")" + case others => others + } + ), + nullable = true + )(qualifier = Option(tableName + "." + column.getColName)) + } + } + } + + def addDecimalScaleAndPrecision(dimval: CarbonColumn, dataType: String): String = { + var dType = dataType + if (dimval.getDataType + == org.apache.carbondata.core.carbon.metadata.datatype.DataType.DECIMAL) { --- End diff -- fixed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/incubator-carbondata/pull/528 Build Success with Spark 1.6.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/590/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/incubator-carbondata/pull/528 Build Success with Spark 1.6.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/591/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/incubator-carbondata/pull/528 Build Success with Spark 1.6.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/597/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/528#discussion_r96108486 --- Diff: processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java --- @@ -194,28 +196,25 @@ public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws K } class RddScanCallable implements Callable<Void> { - List<CarbonIterator<String[]>> iterList; - - RddScanCallable() { - this.iterList = new ArrayList<CarbonIterator<String[]>>(1000); - } - - public void addJavaRddIterator(CarbonIterator<String[]> iter) { - this.iterList.add(iter); - } - - @Override - public Void call() throws Exception { - StandardLogService.setThreadName(("PROCESS_DataFrame_PARTITIONS"), - Thread.currentThread().getName()); + @Override public Void call() throws Exception { + StandardLogService + .setThreadName(("PROCESS_DataFrame_PARTITIONS"), Thread.currentThread().getName()); try { String[] values = null; - for (CarbonIterator<String[]> iter: iterList) { - iter.initialize(); - while (iter.hasNext()) { - values = iter.next(); - synchronized (putRowLock) { - putRow(data.outputRowMeta, values); + boolean hasNext = true; + CarbonIterator<String[]> iter; + boolean isInitialized = false; + while (hasNext) { + iter = getRddIterator(isInitialized); --- End diff -- can you provide some comment in the code to describe this while loop --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/528#discussion_r96108503 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala --- @@ -782,7 +782,47 @@ case class CarbonRelation( nullable = true)()) } - override val output = dimensionsAttr ++ measureAttr + override val output = { + val columns = tableMeta.carbonTable.getCreateOrderColumn(tableMeta.carbonTable.getFactTableName) + .asScala + columns.filter(!_.isInvisible).map { column => + if (column.isDimesion()) { + val output: DataType = column.getDataType.toString.toLowerCase match { + case "array" => + CarbonMetastoreTypes.toDataType(s"array<${getArrayChildren(column.getColName)}>") + case "struct" => + CarbonMetastoreTypes.toDataType(s"struct<${getStructChildren(column.getColName)}>") + case dType => + val dataType = addDecimalScaleAndPrecision(column, dType) + CarbonMetastoreTypes.toDataType(dataType) + } + AttributeReference(column.getColName, output, nullable = true )( + qualifier = Option(tableName + "." + column.getColName)) + } else { + val output = CarbonMetastoreTypes.toDataType { --- End diff -- can you add comment in code to describe this conversion --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/528#discussion_r96108506 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala --- @@ -782,7 +782,47 @@ case class CarbonRelation( nullable = true)()) } - override val output = dimensionsAttr ++ measureAttr + override val output = { + val columns = tableMeta.carbonTable.getCreateOrderColumn(tableMeta.carbonTable.getFactTableName) + .asScala + columns.filter(!_.isInvisible).map { column => --- End diff -- can you add comment in code to describe this conversion --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/528#discussion_r96129389 --- Diff: processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java --- @@ -194,28 +196,25 @@ public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws K } class RddScanCallable implements Callable<Void> { - List<CarbonIterator<String[]>> iterList; - - RddScanCallable() { - this.iterList = new ArrayList<CarbonIterator<String[]>>(1000); - } - - public void addJavaRddIterator(CarbonIterator<String[]> iter) { - this.iterList.add(iter); - } - - @Override - public Void call() throws Exception { - StandardLogService.setThreadName(("PROCESS_DataFrame_PARTITIONS"), - Thread.currentThread().getName()); + @Override public Void call() throws Exception { + StandardLogService + .setThreadName(("PROCESS_DataFrame_PARTITIONS"), Thread.currentThread().getName()); try { String[] values = null; - for (CarbonIterator<String[]> iter: iterList) { - iter.initialize(); - while (iter.hasNext()) { - values = iter.next(); - synchronized (putRowLock) { - putRow(data.outputRowMeta, values); + boolean hasNext = true; + CarbonIterator<String[]> iter; + boolean isInitialized = false; + while (hasNext) { + iter = getRddIterator(isInitialized); --- End diff -- ok --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
Free forum by Nabble | Edit this page |