Login  Register

Re: [Serious Issue] Rows disappeared

Posted by aaron on Sep 27, 2018; 2:07pm
URL: http://apache-carbondata-dev-mailing-list-archive.168.s1.nabble.com/Serious-Issue-Query-get-inconsistent-result-on-carbon1-5-0-tp63691p64033.html

This is the method I construct carbon instance, hope this can help you.

def carbonSession(appName: String, masterUrl: String, parallelism: String,
logLevel: String, hdfsUrl:
String="hdfs://xxx:9000"): SparkSession = {
    val storeLocation = s"${hdfsUrl}/usr/carbon/data"

    CarbonProperties.getInstance()
      .addProperty(CarbonCommonConstants.STORE_LOCATION, storeLocation)
      .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "true")
      .addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
      .addProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_BLOCKLET)
      .addProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
"false")
      .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "true")
      //.addProperty(CarbonCommonConstants.ENABLE_AUTO_HANDOFF, "true")
      .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
      .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
"4,3")
      .addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT, "0")
      .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
s"${hdfsUrl}/usr/carbon/badrecords")
      .addProperty(CarbonCommonConstants.CARBON_QUERY_MIN_MAX_ENABLED,
"true")
      .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "false")
      .addProperty(CarbonCommonConstants.ENABLE_DATA_LOADING_STATISTICS,
"false")
      .addProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME, "2")  //
2 minutes
      .addProperty(CarbonCommonConstants.LOCK_TYPE, "HDFSLOCK")
      .addProperty(CarbonCommonConstants.LOCK_PATH,
s"${hdfsUrl}/usr/carbon/lock")
      .addProperty(CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD,
s"${parallelism}")
     
.addProperty(CarbonCommonConstants.CARBON_INVISIBLE_SEGMENTS_PRESERVE_COUNT,
"100")
      .addProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
s"${parallelism}")
      .addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, "LOCAL_SORT")
      .addProperty(CarbonCommonConstants.NUM_CORES_COMPACTING,
s"${parallelism}")
      .addProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB,
"4096")
      .addProperty(CarbonCommonConstants.NUM_CORES_LOADING,
s"${parallelism}")
      .addProperty(CarbonCommonConstants.CARBON_MAJOR_COMPACTION_SIZE,
"1024")
      .addProperty(CarbonCommonConstants.BLOCKLET_SIZE, "64")
      //.addProperty(CarbonCommonConstants.TABLE_BLOCKLET_SIZE, "64")

    import org.apache.spark.sql.CarbonSession._

    val carbon = SparkSession
      .builder()
      .master(masterUrl)
      .appName(appName)
      .config("spark.hadoop.fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem")
      .config("spark.hadoop.dfs.replication", 1)
      .config("spark.cores.max", s"${parallelism}")
      .getOrCreateCarbonSession(storeLocation)

    carbon.sparkContext.hadoopConfiguration.setInt("dfs.replication", 1)

    carbon.sql(s"SET spark.default.parallelism=${parallelism}")
    carbon.sql(s"SET spark.sql.shuffle.partitions=${parallelism}")
    carbon.sql(s"SET spark.sql.cbo.enabled=true")
    carbon.sql(s"SET carbon.options.bad.records.logger.enable=true")

    carbon.sparkContext.setLogLevel(logLevel)
    carbon
  }



--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/