[ https://issues.apache.org/jira/browse/CARBONDATA-2?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15639605#comment-15639605 ] ASF GitHub Bot commented on CARBONDATA-2: ----------------------------------------- Github user jackylk commented on a diff in the pull request: https://github.com/apache/incubator-carbondata/pull/263#discussion_r86664395 --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala --- @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import java.lang.Long +import java.text.SimpleDateFormat +import java.util +import java.util.{Date, UUID} + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.mapreduce.RecordReader +import org.apache.spark.{Logging, Partition, SparkContext, TaskContext} +import org.apache.spark.mapred.{CarbonHadoopMapReduceUtil, CarbonSerializableConfiguration} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.execution.command.Partitioner +import org.apache.spark.util.SerializableConfiguration + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.common.logging.impl.StandardLogService +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails} +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory +import org.apache.carbondata.hadoop.csv.CSVInputFormat +import org.apache.carbondata.hadoop.io.StringArrayWritable +import org.apache.carbondata.processing.graphgenerator.GraphGenerator +import org.apache.carbondata.spark.DataLoadResult +import org.apache.carbondata.spark.load._ +import org.apache.carbondata.spark.splits.TableSplit +import org.apache.carbondata.spark.util.CarbonQueryUtil + +/** + * It loads the data to carbon using @AbstractDataLoadProcessorStep + */ +class NewCarbonDataLoadRDD[K, V]( + sc: SparkContext, + result: DataLoadResult[K, V], + carbonLoadModel: CarbonLoadModel, + var storeLocation: String, + hdfsStoreLocation: String, + kettleHomePath: String, + partitioner: Partitioner, + columinar: Boolean, + loadCount: Integer, + tableCreationTime: Long, + schemaLastUpdatedTime: Long, + blocksGroupBy: Array[(String, Array[BlockDetails])], + isTableSplitPartition: Boolean) + extends RDD[(K, V)](sc, Nil) with CarbonHadoopMapReduceUtil with Logging { + + sc.setLocalProperty("spark.scheduler.pool", "DDL") + + private val jobTrackerId: String = { + val formatter = new SimpleDateFormat("yyyyMMddHHmm") + formatter.format(new Date()) + } + + // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it + private val confBroadcast = + sc.broadcast(new CarbonSerializableConfiguration(sc.hadoopConfiguration)) + + override def getPartitions: Array[Partition] = { + if (isTableSplitPartition) { + // for table split partition + var splits = Array[TableSplit]() + + if (carbonLoadModel.isDirectLoad) { + splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath, + partitioner.nodeList, partitioner.partitionCount) + } + else { --- End diff -- move up one line > Remove kettle for loading data > ------------------------------ > > Key: CARBONDATA-2 > URL: https://issues.apache.org/jira/browse/CARBONDATA-2 > Project: CarbonData > Issue Type: Improvement > Components: data-load > Reporter: Liang Chen > Priority: Critical > Labels: features > Fix For: 0.3.0-incubating > > Attachments: CarbonDataLoadingdesign.pdf > > > Remove kettle for loading data module -- This message was sent by Atlassian JIRA (v6.3.4#6332) |
Free forum by Nabble | Edit this page |