GitHub user watermen opened a pull request:
https://github.com/apache/carbondata/pull/910 [WIP] Global sort by spark in load process You can merge this pull request into a Git repository by running: $ git pull https://github.com/watermen/incubator-carbondata load Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/910.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #910 ---- commit 0c541a3ab425f60cd3d3f45a2389f3c311d89568 Author: Yadong Qi <[hidden email]> Date: 2017-05-12T06:54:20Z Global sort by spark in load process. ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/910 Build Success with Spark 1.6.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/2007/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/910 Build Success with Spark 1.6.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/2010/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/910 Build Success with Spark 1.6.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/2022/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/910 Build Success with Spark 1.6.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/2038/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user asfgit commented on the issue:
https://github.com/apache/carbondata/pull/910 Can one of the admins verify this patch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user asfgit commented on the issue:
https://github.com/apache/carbondata/pull/910 Refer to this link for build results (access rights to CI server needed): https://builds.apache.org/job/carbondata-pr-spark-1.6/216/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/910 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/2339/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/910 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/2340/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user asfgit commented on the issue:
https://github.com/apache/carbondata/pull/910 Refer to this link for build results (access rights to CI server needed): https://builds.apache.org/job/carbondata-pr-spark-1.6/217/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/910#discussion_r121300564 --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortHelper.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.processing.newflow.sort; + +import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters; +import org.apache.carbondata.processing.util.NonDictionaryUtil; + +public class SortHelper { --- End diff -- Name it `SortStepRowUtil` like `WriteStepRowUtil`, it is for changing the format of carbon row. This behavior may change in future, so we better to put all such conversion in one place. So please change `SortTempFileChunkHolder.getRowFromStream` to use this utility. And `UnsafeSingleThreadFinalSortFilesMerger.convertRow` as well --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/910#discussion_r121300326 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSort.scala --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.load + +import java.util.Comparator + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.row.CarbonRow +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails +import org.apache.carbondata.processing.csvload.{CSVInputFormat, StringArrayWritable} +import org.apache.carbondata.processing.model.CarbonLoadModel +import org.apache.carbondata.processing.newflow.DataLoadProcessBuilder +import org.apache.carbondata.processing.sortandgroupby.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters} +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil +import org.apache.carbondata.spark.util.CommonUtil +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat +import org.apache.spark.{SparkContext, TaskContext} +import org.apache.spark.rdd.NewHadoopRDD +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.execution.command.ExecutionErrors +import org.apache.spark.storage.StorageLevel + +/** + * Use sortBy operator in spark to load the data + */ +object GlobalSort { --- End diff -- Name it similar to `DataLoadProcessBuilder` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/910#discussion_r121301479 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSortOperates.scala --- @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.load + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException +import org.apache.carbondata.core.datastore.row.CarbonRow +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.processing.csvload.StringArrayWritable +import org.apache.carbondata.processing.model.CarbonLoadModel +import org.apache.carbondata.processing.newflow.DataLoadProcessBuilder +import org.apache.carbondata.processing.newflow.converter.impl.RowConverterImpl +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException +import org.apache.carbondata.processing.newflow.parser.impl.RowParserImpl +import org.apache.carbondata.processing.newflow.sort.SortHelper +import org.apache.carbondata.processing.newflow.steps.{DataConverterProcessorStepImpl, DataWriterProcessorStepImpl} +import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters +import org.apache.carbondata.processing.store.{CarbonFactHandler, CarbonFactHandlerFactory} +import org.apache.carbondata.spark.rdd.{NewRddIterator, StringArrayRow} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.Row +import org.apache.spark.util.LongAccumulator +import org.apache.spark.{SparkEnv, TaskContext} + +import scala.util.Random + +object GlobalSortOperates { --- End diff -- Name it similar to `AbstractDataLoadProcessorStep`, it contains process functions for all steps --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/910#discussion_r121302968 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSortOperates.scala --- @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.load + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException +import org.apache.carbondata.core.datastore.row.CarbonRow +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.processing.csvload.StringArrayWritable +import org.apache.carbondata.processing.model.CarbonLoadModel +import org.apache.carbondata.processing.newflow.DataLoadProcessBuilder +import org.apache.carbondata.processing.newflow.converter.impl.RowConverterImpl +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException +import org.apache.carbondata.processing.newflow.parser.impl.RowParserImpl +import org.apache.carbondata.processing.newflow.sort.SortHelper +import org.apache.carbondata.processing.newflow.steps.{DataConverterProcessorStepImpl, DataWriterProcessorStepImpl} +import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters +import org.apache.carbondata.processing.store.{CarbonFactHandler, CarbonFactHandlerFactory} +import org.apache.carbondata.spark.rdd.{NewRddIterator, StringArrayRow} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.Row +import org.apache.spark.util.LongAccumulator +import org.apache.spark.{SparkEnv, TaskContext} + +import scala.util.Random + +object GlobalSortOperates { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + def toStringArrayRow(row: StringArrayWritable, columnCount: Int): StringArrayRow = { + val outRow = new StringArrayRow(new Array[String](columnCount)) + outRow.setValues(row.get()) + } + + def toRDDIterator( + rows: Iterator[Row], + modelBroadcast: Broadcast[CarbonLoadModel]): Iterator[Array[AnyRef]] = { + new Iterator[Array[AnyRef]] { + val iter = new NewRddIterator(rows, modelBroadcast.value, TaskContext.get()) + + override def hasNext: Boolean = iter.hasNext + + override def next(): Array[AnyRef] = iter.next + } + } + + def inputFunc( + rows: Iterator[Array[AnyRef]], + index: Int, + currentLoadCount: Int, + modelBroadcast: Broadcast[CarbonLoadModel], + rowNumber: LongAccumulator): Iterator[CarbonRow] = { + val model: CarbonLoadModel = getModelCopy(index, currentLoadCount, modelBroadcast) + val conf = DataLoadProcessBuilder.createConfiguration(model) + val rowParser = new RowParserImpl(conf.getDataFields, conf) + + TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) => + wrapException(e, model) + } + + new Iterator[CarbonRow] { + override def hasNext: Boolean = rows.hasNext + + override def next(): CarbonRow = { + rowNumber.add(1) + new CarbonRow(rowParser.parseRow(rows.next())) + } + } + } + + def convertFunc( + rows: Iterator[CarbonRow], + index: Int, + currentLoadCount: Int, + modelBroadcast: Broadcast[CarbonLoadModel], + partialSuccessAccum: LongAccumulator, + rowNumber: LongAccumulator): Iterator[CarbonRow] = { + val model: CarbonLoadModel = getModelCopy(index, currentLoadCount, modelBroadcast) + val conf = DataLoadProcessBuilder.createConfiguration(model) + val badRecordLogger = DataConverterProcessorStepImpl.createBadRecordLogger(conf) + val rowConverter = new RowConverterImpl(conf.getDataFields, conf, badRecordLogger) + rowConverter.initialize() + + TaskContext.get().addTaskCompletionListener { context => + DataConverterProcessorStepImpl.close(badRecordLogger, conf, rowConverter) + GlobalSortHelper.badRecordsLogger(model, partialSuccessAccum) + } + + TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) => + DataConverterProcessorStepImpl.close(badRecordLogger, conf, rowConverter) + GlobalSortHelper.badRecordsLogger(model, partialSuccessAccum) + + wrapException(e, model) + } + + new Iterator[CarbonRow] { + override def hasNext: Boolean = rows.hasNext + + override def next(): CarbonRow = { + rowNumber.add(1) + rowConverter.convert(rows.next()) + } + } + } + + def convertTo3Parts( + rows: Iterator[CarbonRow], + index: Int, + currentLoadCount: Int, + modelBroadcast: Broadcast[CarbonLoadModel], + rowNumber: LongAccumulator): Iterator[CarbonRow] = { + val model: CarbonLoadModel = getModelCopy(index, currentLoadCount, modelBroadcast) + val conf = DataLoadProcessBuilder.createConfiguration(model) + val sortParameters = SortParameters.createSortParameters(conf) + val sortHelper = new SortHelper(sortParameters) + + TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) => + wrapException(e, model) + } + + new Iterator[CarbonRow] { + override def hasNext: Boolean = rows.hasNext + + override def next(): CarbonRow = { + rowNumber.add(1) + new CarbonRow(sortHelper.convertRow(rows.next().getData)) + } + } + } + + def writeFunc( + rows: Iterator[CarbonRow], + index: Int, + currentLoadCount: Int, + modelBroadcast: Broadcast[CarbonLoadModel], + rowNumber: LongAccumulator) { + var model: CarbonLoadModel = null + var tableName: String = null + var rowConverter: RowConverterImpl = null + + try { + model = getModelCopy(index, currentLoadCount, modelBroadcast) + val storeLocation = getTempStoreLocation(index) + val conf = DataLoadProcessBuilder.createConfiguration(model, storeLocation) + + tableName = model.getTableName + + // When we use sortBy, it means we have 2 stages. Stage1 can't get the finder from Stage2 directly because they + // are in different processes. We need to set cardinality finder in Stage1 again. + rowConverter = new RowConverterImpl(conf.getDataFields, conf, null) + rowConverter.initialize() + conf.setCardinalityFinder(rowConverter) + + val dataWriter = new DataWriterProcessorStepImpl(conf) + + val dataHandlerModel = dataWriter.getDataHandlerModel(0) + var dataHandler: CarbonFactHandler = null + var rowsNotExist = true + while (rows.hasNext) { + if (rowsNotExist) { + rowsNotExist = false + dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(dataHandlerModel, + CarbonFactHandlerFactory.FactHandlerType.COLUMNAR) + dataHandler.initialise() + } + rowNumber.add(1) + dataWriter.processRow(rows.next(), dataHandler) + } + + if (!rowsNotExist) { + dataWriter.finish(dataHandler) + } + } catch { + case e: CarbonDataWriterException => + LOGGER.error(e, "Failed for table: " + tableName + " in Data Writer Step") + throw new CarbonDataLoadingException("Error while initializing data handler : " + e.getMessage) + case e: Exception => + LOGGER.error(e, "Failed for table: " + tableName + " in Data Writer Step") + throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage, e) + } finally { + if (rowConverter != null) { + rowConverter.finish() + } + // clean up the folders and files created locally for data load operation + CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false) --- End diff -- suggest to move this clean up to caller of `writeFunc` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/910#discussion_r121302875 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSortOperates.scala --- @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.load + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException +import org.apache.carbondata.core.datastore.row.CarbonRow +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.processing.csvload.StringArrayWritable +import org.apache.carbondata.processing.model.CarbonLoadModel +import org.apache.carbondata.processing.newflow.DataLoadProcessBuilder +import org.apache.carbondata.processing.newflow.converter.impl.RowConverterImpl +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException +import org.apache.carbondata.processing.newflow.parser.impl.RowParserImpl +import org.apache.carbondata.processing.newflow.sort.SortHelper +import org.apache.carbondata.processing.newflow.steps.{DataConverterProcessorStepImpl, DataWriterProcessorStepImpl} +import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters +import org.apache.carbondata.processing.store.{CarbonFactHandler, CarbonFactHandlerFactory} +import org.apache.carbondata.spark.rdd.{NewRddIterator, StringArrayRow} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.Row +import org.apache.spark.util.LongAccumulator +import org.apache.spark.{SparkEnv, TaskContext} + +import scala.util.Random + +object GlobalSortOperates { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + def toStringArrayRow(row: StringArrayWritable, columnCount: Int): StringArrayRow = { + val outRow = new StringArrayRow(new Array[String](columnCount)) + outRow.setValues(row.get()) + } + + def toRDDIterator( + rows: Iterator[Row], + modelBroadcast: Broadcast[CarbonLoadModel]): Iterator[Array[AnyRef]] = { + new Iterator[Array[AnyRef]] { + val iter = new NewRddIterator(rows, modelBroadcast.value, TaskContext.get()) + + override def hasNext: Boolean = iter.hasNext + + override def next(): Array[AnyRef] = iter.next + } + } + + def inputFunc( + rows: Iterator[Array[AnyRef]], + index: Int, + currentLoadCount: Int, + modelBroadcast: Broadcast[CarbonLoadModel], + rowNumber: LongAccumulator): Iterator[CarbonRow] = { + val model: CarbonLoadModel = getModelCopy(index, currentLoadCount, modelBroadcast) + val conf = DataLoadProcessBuilder.createConfiguration(model) + val rowParser = new RowParserImpl(conf.getDataFields, conf) + + TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) => + wrapException(e, model) + } + + new Iterator[CarbonRow] { + override def hasNext: Boolean = rows.hasNext + + override def next(): CarbonRow = { + rowNumber.add(1) --- End diff -- should add after parsing it, please change all `rowNumber.add(1)` places --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on the issue:
https://github.com/apache/carbondata/pull/910 Please create an umbrella JIRA and track loading, query, compaction requirement of this new feature --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/910#discussion_r121301080 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSort.scala --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.load + +import java.util.Comparator + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.row.CarbonRow +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails +import org.apache.carbondata.processing.csvload.{CSVInputFormat, StringArrayWritable} +import org.apache.carbondata.processing.model.CarbonLoadModel +import org.apache.carbondata.processing.newflow.DataLoadProcessBuilder +import org.apache.carbondata.processing.sortandgroupby.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters} +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil +import org.apache.carbondata.spark.util.CommonUtil +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat +import org.apache.spark.{SparkContext, TaskContext} +import org.apache.spark.rdd.NewHadoopRDD +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.execution.command.ExecutionErrors +import org.apache.spark.storage.StorageLevel + +/** + * Use sortBy operator in spark to load the data + */ +object GlobalSort { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + def loadDataUsingGlobalSort( + sc: SparkContext, + dataFrame: Option[DataFrame], + model: CarbonLoadModel, + currentLoadCount: Int): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { + val originRDD = if (dataFrame.isDefined) { + dataFrame.get.rdd + } else { + // input data from files + val hadoopConfiguration = new Configuration() + CommonUtil.configureCSVInputFormat(hadoopConfiguration, model) + hadoopConfiguration.set(FileInputFormat.INPUT_DIR, model.getFactFilePath) + val columnCount = model.getCsvHeaderColumns.length + new NewHadoopRDD[NullWritable, StringArrayWritable]( + sc, + classOf[CSVInputFormat], + classOf[NullWritable], + classOf[StringArrayWritable], + hadoopConfiguration) + .map(x => GlobalSortOperates.toStringArrayRow(x._2, columnCount)) + } + + val modelBroadcast = sc.broadcast(model) + val partialSuccessAccum = sc.longAccumulator("Partial Success Accumulator") + + val inputStepRowNumber = sc.longAccumulator("Input Processor Accumulator") + val convertStepRowNumber = sc.longAccumulator("Convert Processor Accumulator") + val sortStepRowNumber = sc.longAccumulator("Sort Processor Accumulator") + val writeStepRowNumber = sc.longAccumulator("Write Processor Accumulator") + + // 1. Input + val inputRDD = originRDD.mapPartitions(rows => GlobalSortOperates.toRDDIterator(rows, modelBroadcast)) + .mapPartitionsWithIndex { case (index, rows) => + GlobalSortOperates.inputFunc(rows, index, currentLoadCount, modelBroadcast, inputStepRowNumber) + } + + // 2. Convert + val convertRDD = inputRDD.mapPartitionsWithIndex { case (index, rows) => + GlobalSortOperates.convertFunc(rows, index, currentLoadCount, modelBroadcast, partialSuccessAccum, + convertStepRowNumber) + }.filter(_ != null)// Filter the bad record + + // 3. Sort + val configuration = DataLoadProcessBuilder.createConfiguration(model) + val sortParameters = SortParameters.createSortParameters(configuration) + object RowOrdering extends Ordering[Array[AnyRef]] { + def compare(rowA: Array[AnyRef], rowB: Array[AnyRef]): Int = { + val rowComparator: Comparator[Array[AnyRef]] = + if (sortParameters.getNoDictionaryCount > 0) { + new NewRowComparator(sortParameters.getNoDictionaryDimnesionColumn) + } else { + new NewRowComparatorForNormalDims(sortParameters.getDimColCount) + } + + rowComparator.compare(rowA, rowB) + } + } + + var numPartitions = CarbonDataProcessorUtil.getGlobalSortPartitions(configuration) + if (numPartitions <= 0) { + numPartitions = convertRDD.partitions.length// TODO + } + + // Because if the number of partitions greater than 1, there will be action operator(sample) in sortBy operator. + // So here we cache the rdd to avoid do input and convert again. + if (numPartitions > 1) { + convertRDD.persist(StorageLevel.MEMORY_AND_DISK) + } + + import scala.reflect.classTag + val sortRDD = + convertRDD.sortBy(_.getData, numPartitions = numPartitions)(RowOrdering, classTag[Array[AnyRef]]) + .mapPartitionsWithIndex { case (index, rows) => + GlobalSortOperates.convertTo3Parts(rows, index, currentLoadCount, modelBroadcast, sortStepRowNumber) + } + + // 4. Write + sc.runJob(sortRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => + GlobalSortOperates.writeFunc(rows, context.partitionId, currentLoadCount, modelBroadcast, writeStepRowNumber)) + + // clean cache + convertRDD.unpersist() + + // Log the number of rows in each step + LOGGER.audit("Total rows processed in step Input Processor: " + inputStepRowNumber.value) --- End diff -- should not use `audit` level, should be `info` instead --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/910#discussion_r121300832 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSortHelper.scala --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.load + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.CarbonTableIdentifier +import org.apache.carbondata.processing.model.CarbonLoadModel +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException +import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordsLogger +import org.apache.spark.util.LongAccumulator + +object GlobalSortHelper { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + def badRecordsLogger(loadModel: CarbonLoadModel, badRecordsAccum: LongAccumulator): Unit = { + val key = new CarbonTableIdentifier(loadModel.getDatabaseName, loadModel.getTableName, null).getBadRecordLoggerKey + if (null != BadRecordsLogger.hasBadRecord(key)) { + LOGGER.error("Data Load is partially success for table " + loadModel.getTableName) + badRecordsAccum.add(1) + } else { + LOGGER.info("Data loading is successful for table " + loadModel.getTableName) + } + } + + def tryWithSafeFinally[T](tableName: String, block: => T) --- End diff -- Seems no one is using this function --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user watermen commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/910#discussion_r121304371 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSortHelper.scala --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.load + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.CarbonTableIdentifier +import org.apache.carbondata.processing.model.CarbonLoadModel +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException +import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordsLogger +import org.apache.spark.util.LongAccumulator + +object GlobalSortHelper { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + def badRecordsLogger(loadModel: CarbonLoadModel, badRecordsAccum: LongAccumulator): Unit = { + val key = new CarbonTableIdentifier(loadModel.getDatabaseName, loadModel.getTableName, null).getBadRecordLoggerKey + if (null != BadRecordsLogger.hasBadRecord(key)) { + LOGGER.error("Data Load is partially success for table " + loadModel.getTableName) + badRecordsAccum.add(1) + } else { + LOGGER.info("Data loading is successful for table " + loadModel.getTableName) + } + } + + def tryWithSafeFinally[T](tableName: String, block: => T) --- End diff -- Delete --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user watermen commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/910#discussion_r121304254 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSort.scala --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.load + +import java.util.Comparator + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.row.CarbonRow +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails +import org.apache.carbondata.processing.csvload.{CSVInputFormat, StringArrayWritable} +import org.apache.carbondata.processing.model.CarbonLoadModel +import org.apache.carbondata.processing.newflow.DataLoadProcessBuilder +import org.apache.carbondata.processing.sortandgroupby.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters} +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil +import org.apache.carbondata.spark.util.CommonUtil +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat +import org.apache.spark.{SparkContext, TaskContext} +import org.apache.spark.rdd.NewHadoopRDD +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.execution.command.ExecutionErrors +import org.apache.spark.storage.StorageLevel + +/** + * Use sortBy operator in spark to load the data + */ +object GlobalSort { --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
Free forum by Nabble | Edit this page |