[ https://issues.apache.org/jira/browse/CARBONDATA-2?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15641640#comment-15641640 ] ASF GitHub Bot commented on CARBONDATA-2: ----------------------------------------- Github user ravipesala commented on a diff in the pull request: https://github.com/apache/incubator-carbondata/pull/263#discussion_r86684268 --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala --- @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import java.lang.Long +import java.text.SimpleDateFormat +import java.util +import java.util.{Date, UUID} + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.mapreduce.RecordReader +import org.apache.spark.{Logging, Partition, SparkContext, TaskContext} +import org.apache.spark.mapred.{CarbonHadoopMapReduceUtil, CarbonSerializableConfiguration} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.execution.command.Partitioner +import org.apache.spark.util.SerializableConfiguration + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.common.logging.impl.StandardLogService +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails} +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory +import org.apache.carbondata.hadoop.csv.CSVInputFormat +import org.apache.carbondata.hadoop.io.StringArrayWritable +import org.apache.carbondata.processing.graphgenerator.GraphGenerator +import org.apache.carbondata.spark.DataLoadResult +import org.apache.carbondata.spark.load._ +import org.apache.carbondata.spark.splits.TableSplit +import org.apache.carbondata.spark.util.CarbonQueryUtil + +/** + * It loads the data to carbon using @AbstractDataLoadProcessorStep + */ +class NewCarbonDataLoadRDD[K, V]( + sc: SparkContext, + result: DataLoadResult[K, V], + carbonLoadModel: CarbonLoadModel, + var storeLocation: String, + hdfsStoreLocation: String, + kettleHomePath: String, + partitioner: Partitioner, + columinar: Boolean, + loadCount: Integer, + tableCreationTime: Long, + schemaLastUpdatedTime: Long, + blocksGroupBy: Array[(String, Array[BlockDetails])], + isTableSplitPartition: Boolean) + extends RDD[(K, V)](sc, Nil) with CarbonHadoopMapReduceUtil with Logging { + + sc.setLocalProperty("spark.scheduler.pool", "DDL") + + private val jobTrackerId: String = { + val formatter = new SimpleDateFormat("yyyyMMddHHmm") + formatter.format(new Date()) + } + + // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it + private val confBroadcast = + sc.broadcast(new CarbonSerializableConfiguration(sc.hadoopConfiguration)) + + override def getPartitions: Array[Partition] = { + if (isTableSplitPartition) { + // for table split partition + var splits = Array[TableSplit]() + + if (carbonLoadModel.isDirectLoad) { + splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath, + partitioner.nodeList, partitioner.partitionCount) + } + else { + splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName, + carbonLoadModel.getTableName, null, partitioner) + } + + splits.zipWithIndex.map { s => + // filter the same partition unique id, because only one will match, so get 0 element + val blocksDetails: Array[BlockDetails] = blocksGroupBy.filter(p => + p._1 == s._1.getPartition.getUniqueID)(0)._2 + new CarbonTableSplitPartition(id, s._2, s._1, blocksDetails) + } + } else { + // for node partition + blocksGroupBy.zipWithIndex.map { b => + new CarbonNodePartition(id, b._2, b._1._1, b._1._2) + } + } + } + + override def checkpoint() { + // Do nothing. Hadoop RDD should not be checkpointed. + } + + override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + val iter = new Iterator[(K, V)] { + var partitionID = "0" + val loadMetadataDetails = new LoadMetadataDetails() + var model: CarbonLoadModel = _ + var uniqueLoadStatusId = + carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index + try { + loadMetadataDetails.setPartitionCount(partitionID) + loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) + + carbonLoadModel.setSegmentId(String.valueOf(loadCount)) + val recordReaders = getRecordReaders + val loader = new SparkPartitionLoader(model, + theSplit.index, + hdfsStoreLocation, + kettleHomePath, + loadCount, + loadMetadataDetails) + // Intialize to set carbon properties + loader.initialize() + loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS) + CarbonLoaderUtil.executeNewDataLoad(model, + loader.storeLocation, + hdfsStoreLocation, + recordReaders) + } catch { + case e: Exception => + logInfo("DataLoad failure") + LOGGER.error(e) + throw e + } + + def getRecordReaders: Array[RecordReader[NullWritable, StringArrayWritable]] = { + val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, theSplit.index, 0) + val configuration: Configuration = confBroadcast.value.value + configureCSVInputFormat(configuration) + val hadoopAttemptContext = newTaskAttemptContext(configuration, attemptId) + val format = new CSVInputFormat + if (isTableSplitPartition) { + // for table split partition + val split = theSplit.asInstanceOf[CarbonTableSplitPartition] + logInfo("Input split: " + split.serializableHadoopSplit.value) + carbonLoadModel.setTaskNo(String.valueOf(theSplit.index)) + if (carbonLoadModel.isDirectLoad) { + model = carbonLoadModel.getCopyWithPartition( + split.serializableHadoopSplit.value.getPartition.getUniqueID, + split.serializableHadoopSplit.value.getPartition.getFilesPath, + carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter) + } else { + model = carbonLoadModel.getCopyWithPartition( + split.serializableHadoopSplit.value.getPartition.getUniqueID) + } + partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID + + StandardLogService.setThreadName(partitionID, null) + CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordPartitionBlockMap( + partitionID, split.partitionBlocksDetail.length) + val readers = + split.partitionBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext)) + readers.zipWithIndex + .foreach(f => f._1.initialize(split.partitionBlocksDetail(f._2), hadoopAttemptContext)) + readers + } else { + // for node partition + val split = theSplit.asInstanceOf[CarbonNodePartition] + logInfo("Input split: " + split.serializableHadoopSplit) + logInfo("The Block Count in this node :" + split.nodeBlocksDetail.length) + CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordHostBlockMap( + split.serializableHadoopSplit, split.nodeBlocksDetail.length) + val blocksID = gernerateBlocksID + carbonLoadModel.setTaskNo(String.valueOf(theSplit.index)) + // set this node blocks info to global static map + GraphGenerator.blockInfo.put(blocksID, split.nodeBlocksDetail) --- End diff -- removed > Remove kettle for loading data > ------------------------------ > > Key: CARBONDATA-2 > URL: https://issues.apache.org/jira/browse/CARBONDATA-2 > Project: CarbonData > Issue Type: Improvement > Components: data-load > Reporter: Liang Chen > Priority: Critical > Labels: features > Fix For: 0.3.0-incubating > > Attachments: CarbonDataLoadingdesign.pdf > > > Remove kettle for loading data module -- This message was sent by Atlassian JIRA (v6.3.4#6332) |
Free forum by Nabble | Edit this page |