Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1605 Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/533/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1605 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2158/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1605 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2161/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1605 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2162/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1605 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1791/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1605 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2164/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r155436882 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import java.util.concurrent.ExecutorService + +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.execution.command.CompactionModel + +import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.processing.merger.CarbonDataMergerUtil +import org.apache.carbondata.processing.util.CarbonLoaderUtil + +abstract class Compactable(carbonLoadModel: CarbonLoadModel, --- End diff -- name this class as Compactor, and other Compactor class can be moved to CarbonTableCompactor --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r155437020 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import java.util +import java.util.concurrent.{ExecutorService, Future} + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.execution.command.{CompactionCallableModel, CompactionModel} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType} +import org.apache.carbondata.spark.compaction.CompactionCallable +import org.apache.carbondata.spark.util.CommonUtil + +/** + * This class is used to perform compaction on carbon table. + */ +class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, --- End diff -- Move all the compactor interfaces and impls to spark-common --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r155441909 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import java.util +import java.util.concurrent.{ExecutorService, Future} + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.execution.command.{CompactionCallableModel, CompactionModel} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType} +import org.apache.carbondata.spark.compaction.CompactionCallable +import org.apache.carbondata.spark.util.CommonUtil + +/** + * This class is used to perform compaction on carbon table. + */ +class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, --- End diff -- AggregateDataMapCompactor requires CarbonSession which is not available in spark-common --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r155444843 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import java.util.concurrent.ExecutorService + +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.execution.command.CompactionModel + +import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.processing.merger.CarbonDataMergerUtil +import org.apache.carbondata.processing.util.CarbonLoaderUtil + +abstract class Compactable(carbonLoadModel: CarbonLoadModel, --- End diff -- Changed the class name to Compactor --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1605 Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/553/ --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r155439799 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -541,4 +541,20 @@ object PreAggregateUtil { } } + def createChildSelectQuery(tableSchema: TableSchema): String = { + val aggregateColumns = scala.collection.mutable.ArrayBuffer.empty[String] + val groupingExpressions = scala.collection.mutable.ArrayBuffer.empty[String] --- End diff -- please use ArrayBuffer.empty[String] --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r155440643 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -541,4 +541,20 @@ object PreAggregateUtil { } } + def createChildSelectQuery(tableSchema: TableSchema): String = { + val aggregateColumns = scala.collection.mutable.ArrayBuffer.empty[String] + val groupingExpressions = scala.collection.mutable.ArrayBuffer.empty[String] + tableSchema.getListOfColumns.asScala.foreach { + a => if (a.getAggFunction.nonEmpty) { + aggregateColumns += s"${a.getAggFunction match { + case "count" => "sum" + case _ => a.getAggFunction}}(${a.getColumnName})" + } else { + groupingExpressions += a.getColumnName + } + } + s"select ${ groupingExpressions.mkString(",") },${ aggregateColumns.mkString(",") + } from ${ tableSchema.getTableName } group by ${ groupingExpressions.mkString(",") }" --- End diff -- why not require the database name --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r155440875 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.rdd + +import java.util.concurrent.ExecutorService + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonSession, SQLContext} +import org.apache.spark.sql.execution.command.CompactionModel +import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand +import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil +import org.apache.spark.sql.parser.CarbonSpark2SqlParser + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType} + +/** + * Used to perform compaction on Aggregate data map. + */ +class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel, + compactionModel: CompactionModel, + executor: ExecutorService, + sqlContext: SQLContext, + storeLocation: String) + extends Compactable(carbonLoadModel, compactionModel, executor, sqlContext, storeLocation) { + + override def executeCompaction(): Unit = { + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val loadMetaDataDetails = identifySegmentsToBeMerged() + val segments = loadMetaDataDetails.asScala.map(_.getLoadName) + if (segments.nonEmpty) { + val mergedLoadName = CarbonDataMergerUtil.getMergedLoadName(loadMetaDataDetails).split("_")(1) + CarbonSession.threadSet( + CarbonCommonConstants.CARBON_INPUT_SEGMENTS + + carbonLoadModel.getDatabaseName + "." + + carbonLoadModel.getTableName, + segments.mkString(",")) + CarbonSession.threadSet( + CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + + carbonLoadModel.getDatabaseName + "." + + carbonLoadModel.getTableName, "false") + val headers = carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala + .map(_.getColumnName).mkString(",") + // Creating a new query string to insert data into pre-aggregate table from that same table. + // For example: To compact preaggtable1 we can fire a query like insert into preaggtable1 + // select * from preaggtable1 + // The following code will generate the select query with a load UDF that will be used to + // apply DataLoadingRules + val childDataFrame = sqlContext.sparkSession.sql(new CarbonSpark2SqlParser() + // adding the aggregation load UDF + .addPreAggLoadFunction(PreAggregateUtil + // creating the select query on the bases on table schema + .createChildSelectQuery(carbonTable.getTableInfo.getFactTable))).drop("preAggLoad") --- End diff -- indentation is incorrect --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r155448335 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateCompaction.scala --- @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.integration.spark.testsuite.preaggregate + +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterEach +import org.scalatest.Matchers._ + +class TestPreAggregateCompaction extends QueryTest with BeforeAndAfterEach { + + val testData = s"$resourcesPath/sample.csv" + + override def beforeEach(): Unit = { + sql("DROP TABLE IF EXISTS maintable") --- End diff -- suggest using a new database, not default database --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1605 Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/560/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1605 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1809/ --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on the issue:
https://github.com/apache/carbondata/pull/1605 Retest this please --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1605 Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/571/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1605 Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/583/ --- |
Free forum by Nabble | Edit this page |