[jira] [Commented] (CARBONDATA-2) Remove kettle for loading data

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Commented] (CARBONDATA-2) Remove kettle for loading data

Akash R Nilugal (Jira)

    [ https://issues.apache.org/jira/browse/CARBONDATA-2?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15639584#comment-15639584 ]

ASF GitHub Bot commented on CARBONDATA-2:
-----------------------------------------

Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/263#discussion_r86664148
 
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala ---
    @@ -0,0 +1,281 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.spark.rdd
    +
    +import java.lang.Long
    +import java.text.SimpleDateFormat
    +import java.util
    +import java.util.{Date, UUID}
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.io.NullWritable
    +import org.apache.hadoop.mapreduce.RecordReader
    +import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
    +import org.apache.spark.mapred.{CarbonHadoopMapReduceUtil, CarbonSerializableConfiguration}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.execution.command.Partitioner
    +import org.apache.spark.util.SerializableConfiguration
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +import org.apache.carbondata.common.logging.impl.StandardLogService
    +import org.apache.carbondata.core.constants.CarbonCommonConstants
    +import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
    +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
    +import org.apache.carbondata.hadoop.csv.CSVInputFormat
    +import org.apache.carbondata.hadoop.io.StringArrayWritable
    +import org.apache.carbondata.processing.graphgenerator.GraphGenerator
    +import org.apache.carbondata.spark.DataLoadResult
    +import org.apache.carbondata.spark.load._
    +import org.apache.carbondata.spark.splits.TableSplit
    +import org.apache.carbondata.spark.util.CarbonQueryUtil
    +
    +/**
    + * It loads the data to carbon using @AbstractDataLoadProcessorStep
    + */
    +class NewCarbonDataLoadRDD[K, V](
    +    sc: SparkContext,
    +    result: DataLoadResult[K, V],
    +    carbonLoadModel: CarbonLoadModel,
    +    var storeLocation: String,
    +    hdfsStoreLocation: String,
    +    kettleHomePath: String,
    +    partitioner: Partitioner,
    +    columinar: Boolean,
    +    loadCount: Integer,
    +    tableCreationTime: Long,
    +    schemaLastUpdatedTime: Long,
    +    blocksGroupBy: Array[(String, Array[BlockDetails])],
    +    isTableSplitPartition: Boolean)
    +  extends RDD[(K, V)](sc, Nil) with CarbonHadoopMapReduceUtil with Logging {
    +
    +  sc.setLocalProperty("spark.scheduler.pool", "DDL")
    +
    +  private val jobTrackerId: String = {
    +    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
    +    formatter.format(new Date())
    +  }
    +
    +  // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
    +  private val confBroadcast =
    +    sc.broadcast(new CarbonSerializableConfiguration(sc.hadoopConfiguration))
    +
    +  override def getPartitions: Array[Partition] = {
    +    if (isTableSplitPartition) {
    +      // for table split partition
    +      var splits = Array[TableSplit]()
    --- End diff --
   
    unnecessary initialization


> Remove kettle for loading data
> ------------------------------
>
>                 Key: CARBONDATA-2
>                 URL: https://issues.apache.org/jira/browse/CARBONDATA-2
>             Project: CarbonData
>          Issue Type: Improvement
>          Components: data-load
>            Reporter: Liang Chen
>            Priority: Critical
>              Labels: features
>             Fix For: 0.3.0-incubating
>
>         Attachments: CarbonDataLoadingdesign.pdf
>
>
> Remove kettle for loading data module



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)