[jira] [Comment Edited] (CARBONDATA-906) Always OOM error when import large dataset (100milion rows)

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Comment Edited] (CARBONDATA-906) Always OOM error when import large dataset (100milion rows)

Akash R Nilugal (Jira)

    [ https://issues.apache.org/jira/browse/CARBONDATA-906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15965585#comment-15965585 ]

Crabo Yang edited comment on CARBONDATA-906 at 4/12/17 9:17 AM:
----------------------------------------------------------------

1.oozie spark-opts
<spark-opts>
--jars rds.importer-1.0-SNAPSHOT.jar,carbondata_2.10-1.0.0-incubating-shade-hadoop2.6.0-cdh5.7.0.jar
--num-executors 12 --executor-cores 4 --executor-memory 13G
--conf spark.yarn.executor.memoryOverhead=5120
--conf spark.executor.heartbeatInterval=10000000
--conf spark.network.timeout=10000000
</spark-opts>

2.create script
CREATE TABLE IF NOT EXISTS dmp_trade(id STRING,buyerNick STRING,buyerAlipayNO STRING,clientType STRING,sellerNick STRING,receiverName STRING,receiverMobile STRING,receiverPhone STRING,receiverCountry STRING,receiverState STRING,receiverCity STRING,receiverDistrict STRING,receiverTown STRING,receiverAddress STRING,receiverZip STRING,status STRING,tradeFrom STRING,type STRING,stepTradeStatus STRING,shippingType STRING,title STRING,buyerMessage STRING,buyerMemo STRING,rxAuditStatus STRING,buyerEmail STRING,picPath STRING,shopPick STRING,creditCardFee STRING,markDesc STRING,sellerMemo STRING,invoiceName STRING,invoiceType STRING,tradeAttr STRING,esRange STRING,esDate STRING,osDate STRING,osRange STRING,o2oSnatchStatus STRING,market STRING,etType STRING,obs STRING,tradeOriginalJson STRING,point STRING,omniAttr STRING,omniParam STRING,identity STRING,omnichannelParam STRING,assembly STRING,tradeId BIGINT,itemId BIGINT,platFormId INT,num INT,sellerFlag INT,naSource INT,etShopId INT,forbidConsign INT,buyerFlag INT,topHold INT,nvoiceKind INT,payment STRING,price STRING,totalFee STRING,discountFee STRING,postFee STRING,stepPaidFee STRING,adjustFee STRING,buyerCodFee STRING,orderTaxFee STRING,couponFee STRING,paidCouponFee STRING,sellerRate STRING,buyerRate STRING,postGateDeclare STRING,crossBondedDeclare STRING,hasBuyerMessage STRING,hasPostFee STRING,isShShip STRING,created TIMESTAMP,payTime TIMESTAMP,modified TIMESTAMP,endTime TIMESTAMP,consignTime TIMESTAMP,estConTime TIMESTAMP) STORED BY 'carbondata';

3.carbon.properties
#Mandatory. Carbon Store path
carbon.storelocation=hdfs://master.nascent.com:8020/Opt/CarbonStore
#Base directory for Data files
carbon.ddl.base.hdfs.url=hdfs://master.nascent.com:8020/opt/data
#Path where the bad records are stored
carbon.badRecords.location=/opt/Carbon/Spark/badrecords
#Mandatory. path to kettle home
carbon.kettle.home=/usr/lib/spark/carbonlib/carbonplugins

carbon.load.use.batch.sort=true
enable.unsafe.sort=true
offheap.sort.chunk.size.inmb=1024
carbon.load.batch.sort.size.inmb=450
#File read buffer size used during sorting(in MB) :MIN=1:MAX=100
carbon.sort.file.buffer.size=10
#Rowset size exchanged between data load graph steps :MIN=500:MAX=1000000
carbon.graph.rowset.size=10000
#Number of cores to be used while data loading
carbon.number.of.cores.while.loading=6
#Record count to sort and write to temp intermediate files
carbon.sort.size=500000
#Algorithm for hashmap for hashkey calculation
carbon.enableXXHash=true
#Number of cores to be used for block sort while dataloading
#carbon.number.of.cores.block.sort=7
#max level cache size upto which level cache will be loaded in memory
#carbon.max.level.cache.size=-1
#enable prefetch of data during merge sort while reading data from sort temp files in data loading
#carbon.merge.sort.prefetch=true

#Number of cores to be used while compacting
carbon.number.of.cores.while.compacting=8
#For minor compaction, Number of segments to be merged in stage 1, number of compacted segments to be merged in stage 2.
carbon.compaction.level.threshold=4,3
#default size (in MB) for major compaction to be triggered
carbon.major.compaction.size=1024
#Query Configuration
#Number of cores to be used for loading index into memory
carbon.number.of.cores=8
#Number of records to be in memory while querying :MIN=100000:MAX=240000
carbon.inmemory.record.size=120000
#Improves the performance of filter query
carbon.enable.quick.filter=false
##number of core to load the blocks in driver
#no.of.cores.to.load.blocks.in.driver=10

#Extra Configuration
##Timestamp format of input data used for timestamp data type.
#carbon.timestamp.format=yyyy-MM-dd HH:mm:ss

##File write buffer size used during sorting.
#carbon.sort.file.write.buffer.size=10485760
##Locking mechanism for data loading on a table
carbon.lock.type=HDFSLOCK
##Minimum no of intermediate files after which sort merged to be started.
#carbon.sort.intermediate.files.limit=20
##space reserved in percentage for writing block meta data in carbon data file
#carbon.block.meta.size.reserved.percentage=10
##csv reading buffer size.
#carbon.csv.read.buffersize.byte=1048576
##To identify and apply compression for non-high cardinality columns
#high.cardinality.value=100000
##maximum no of threads used for reading intermediate files for final merging.
#carbon.merge.sort.reader.thread=3
##Carbon blocklet size. Note: this configuration cannot be change once store is generated
#carbon.blocklet.size=120000
##number of retries to get the metadata lock for loading data to table
#carbon.load.metadata.lock.retries=3
##Minimum blocklets needed for distribution.
#carbon.blockletdistribution.min.blocklet.size=10
##Interval between the retries to get the lock
#carbon.load.metadata.lock.retry.timeout.sec=5
##Temporary store location, By default it will take System.getProperty("java.io.tmpdir")
#carbon.tempstore.location=/opt/Carbon/TempStoreLoc
##data loading records count logger
#carbon.load.log.counter=500000

##to specify number of segments to be preserved from compaction
#carbon.numberof.preserve.segments=0
##To determine the loads of number of days to be compacted
#carbon.allowed.compaction.days=0
##To enable compaction while data loading
#carbon.enable.auto.load.merge=false
######## Query Configuration ########
##Maximum time allowed for one query to be executed.
max.query.execution.time=60
##Min max is feature added to enhance query performance. To disable this feature, make it false.
carbon.enableMinMax=true

##To enable/disable identify high cardinality during first data loading
#high.cardinality.identify.enable=true
##threshold to identify whether high cardinality column
#high.cardinality.threshold=1000000
##Percentage to identify whether column cardinality is more than configured percent of total row count
#high.cardinality.row.count.percentage=80
##The property to set the date to be considered as start date for calculating the timestamp.
#carbon.cutOffTimestamp=2000-01-01 00:00:00
##The property to set the timestamp (ie milis) conversion to the SECOND, MINUTE, HOUR or DAY level.
#carbon.timegranularity=SECOND


was (Author: crabo):
1.oozie spark-opts
<spark-opts>
--jars rds.importer-1.0-SNAPSHOT.jar,carbondata_2.10-1.0.0-incubating-shade-hadoop2.6.0-cdh5.7.0.jar
--num-executors 12 --executor-cores 4 --executor-memory 13G
--conf spark.yarn.executor.memoryOverhead=5120
--conf spark.executor.heartbeatInterval=10000000
--conf spark.network.timeout=10000000
</spark-opts>

2.create script
CREATE TABLE IF NOT EXISTS dmp_trade(id STRING,buyerNick STRING,buyerAlipayNO STRING,clientType STRING,sellerNick STRING,receiverName STRING,receiverMobile STRING,receiverPhone STRING,receiverCountry STRING,receiverState STRING,receiverCity STRING,receiverDistrict STRING,receiverTown STRING,receiverAddress STRING,receiverZip STRING,status STRING,tradeFrom STRING,type STRING,stepTradeStatus STRING,shippingType STRING,title STRING,buyerMessage STRING,buyerMemo STRING,rxAuditStatus STRING,buyerEmail STRING,picPath STRING,shopPick STRING,creditCardFee STRING,markDesc STRING,sellerMemo STRING,invoiceName STRING,invoiceType STRING,tradeAttr STRING,esRange STRING,esDate STRING,osDate STRING,osRange STRING,o2oSnatchStatus STRING,market STRING,etType STRING,obs STRING,tradeOriginalJson STRING,point STRING,omniAttr STRING,omniParam STRING,identity STRING,omnichannelParam STRING,assembly STRING,tradeId BIGINT,itemId BIGINT,platFormId INT,num INT,sellerFlag INT,naSource INT,etShopId INT,forbidConsign INT,buyerFlag INT,topHold INT,nvoiceKind INT,payment STRING,price STRING,totalFee STRING,discountFee STRING,postFee STRING,stepPaidFee STRING,adjustFee STRING,buyerCodFee STRING,orderTaxFee STRING,couponFee STRING,paidCouponFee STRING,sellerRate STRING,buyerRate STRING,postGateDeclare STRING,crossBondedDeclare STRING,hasBuyerMessage STRING,hasPostFee STRING,isShShip STRING,created TIMESTAMP,payTime TIMESTAMP,modified TIMESTAMP,endTime TIMESTAMP,consignTime TIMESTAMP,estConTime TIMESTAMP) STORED BY 'carbondata';

3.carbon.properties
#System Configuration
#Mandatory. Carbon Store path
carbon.storelocation=hdfs://master.nascent.com:8020/Opt/CarbonStore
#Base directory for Data files
carbon.ddl.base.hdfs.url=hdfs://master.nascent.com:8020/opt/data
#Path where the bad records are stored
carbon.badRecords.location=/opt/Carbon/Spark/badrecords
#Mandatory. path to kettle home
carbon.kettle.home=/usr/lib/spark/carbonlib/carbonplugins
#Performance Configuration
#DataLoading Configuration
carbon.load.use.batch.sort=true
enable.unsafe.sort=true
offheap.sort.chunk.size.inmb=1024
carbon.load.batch.sort.size.inmb=450
#File read buffer size used during sorting(in MB) :MIN=1:MAX=100
carbon.sort.file.buffer.size=10
#Rowset size exchanged between data load graph steps :MIN=500:MAX=1000000
carbon.graph.rowset.size=10000
#Number of cores to be used while data loading
carbon.number.of.cores.while.loading=6
#Record count to sort and write to temp intermediate files
carbon.sort.size=500000
#Algorithm for hashmap for hashkey calculation
carbon.enableXXHash=true
#Number of cores to be used for block sort while dataloading
#carbon.number.of.cores.block.sort=7
#max level cache size upto which level cache will be loaded in memory
#carbon.max.level.cache.size=-1
#enable prefetch of data during merge sort while reading data from sort temp files in data loading
#carbon.merge.sort.prefetch=true
#Compaction Configuration
#Number of cores to be used while compacting
carbon.number.of.cores.while.compacting=8
#For minor compaction, Number of segments to be merged in stage 1, number of compacted segments to be merged in stage 2.
carbon.compaction.level.threshold=4,3
#default size (in MB) for major compaction to be triggered
carbon.major.compaction.size=1024
#Query Configuration
#Number of cores to be used for loading index into memory
carbon.number.of.cores=8
#Number of records to be in memory while querying :MIN=100000:MAX=240000
carbon.inmemory.record.size=120000
#Improves the performance of filter query
carbon.enable.quick.filter=false
##number of core to load the blocks in driver
#no.of.cores.to.load.blocks.in.driver=10

#Extra Configuration
##Timestamp format of input data used for timestamp data type.
#carbon.timestamp.format=yyyy-MM-dd HH:mm:ss
######## Dataload Configuration ########
##File write buffer size used during sorting.
#carbon.sort.file.write.buffer.size=10485760
##Locking mechanism for data loading on a table
carbon.lock.type=HDFSLOCK
##Minimum no of intermediate files after which sort merged to be started.
#carbon.sort.intermediate.files.limit=20
##space reserved in percentage for writing block meta data in carbon data file
#carbon.block.meta.size.reserved.percentage=10
##csv reading buffer size.
#carbon.csv.read.buffersize.byte=1048576
##To identify and apply compression for non-high cardinality columns
#high.cardinality.value=100000
##maximum no of threads used for reading intermediate files for final merging.
#carbon.merge.sort.reader.thread=3
##Carbon blocklet size. Note: this configuration cannot be change once store is generated
#carbon.blocklet.size=120000
##number of retries to get the metadata lock for loading data to table
#carbon.load.metadata.lock.retries=3
##Minimum blocklets needed for distribution.
#carbon.blockletdistribution.min.blocklet.size=10
##Interval between the retries to get the lock
#carbon.load.metadata.lock.retry.timeout.sec=5
##Temporary store location, By default it will take System.getProperty("java.io.tmpdir")
#carbon.tempstore.location=/opt/Carbon/TempStoreLoc
##data loading records count logger
#carbon.load.log.counter=500000
######## Compaction Configuration ########
##to specify number of segments to be preserved from compaction
#carbon.numberof.preserve.segments=0
##To determine the loads of number of days to be compacted
#carbon.allowed.compaction.days=0
##To enable compaction while data loading
#carbon.enable.auto.load.merge=false
######## Query Configuration ########
##Maximum time allowed for one query to be executed.
max.query.execution.time=60
##Min max is feature added to enhance query performance. To disable this feature, make it false.
carbon.enableMinMax=true
######## Global Dictionary Configurations ########
##To enable/disable identify high cardinality during first data loading
#high.cardinality.identify.enable=true
##threshold to identify whether high cardinality column
#high.cardinality.threshold=1000000
##Percentage to identify whether column cardinality is more than configured percent of total row count
#high.cardinality.row.count.percentage=80
##The property to set the date to be considered as start date for calculating the timestamp.
#carbon.cutOffTimestamp=2000-01-01 00:00:00
##The property to set the timestamp (ie milis) conversion to the SECOND, MINUTE, HOUR or DAY level.
#carbon.timegranularity=SECOND

> Always OOM error when import large dataset (100milion rows)
> -----------------------------------------------------------
>
>                 Key: CARBONDATA-906
>                 URL: https://issues.apache.org/jira/browse/CARBONDATA-906
>             Project: CarbonData
>          Issue Type: Bug
>          Components: data-load
>    Affects Versions: 1.0.0-incubating
>            Reporter: Crabo Yang
>         Attachments: carbon.properties
>
>
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> at java.util.concurrent.ConcurrentHashMap$Segment.put(ConcurrentHashMap.java:457)
> at java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1130)
> at org.apache.carbondata.core.cache.dictionary.ColumnReverseDictionaryInfo.addDataToDictionaryMap(ColumnReverseDictionaryInfo.java:101)
> at org.apache.carbondata.core.cache.dictionary.ColumnReverseDictionaryInfo.addDictionaryChunk(ColumnReverseDictionaryInfo.java:88)
> at org.apache.carbondata.core.cache.dictionary.DictionaryCacheLoaderImpl.fillDictionaryValuesAndAddToDictionaryChunks(DictionaryCacheLoaderImpl.java:113)
> at org.apache.carbondata.core.cache.dictionary.DictionaryCacheLoaderImpl.load(DictionaryCacheLoaderImpl.java:81)
> at org.apache.carbondata.core.cache.dictionary.AbstractDictionaryCache.loadDictionaryData(AbstractDictionaryCache.java:236)
> at org.apache.carbondata.core.cache.dictionary.AbstractDictionaryCache.checkAndLoadDictionaryData(AbstractDictionaryCache.java:186)
> at org.apache.carbondata.core.cache.dictionary.ReverseDictionaryCache.getDictionary(ReverseDictionaryCache.java:174)
> at org.apache.carbondata.core.cache.dictionary.ReverseDictionaryCache.get(ReverseDictionaryCache.java:67)
> at org.apache.carbondata.core.cache.dictionary.ReverseDictionaryCache.get(ReverseDictionaryCache.java:38)
> at org.apache.carbondata.processing.newflow.converter.impl.DictionaryFieldConverterImpl.<init>(DictionaryFieldConverterImpl.java:92)
> at org.apache.carbondata.processing.newflow.converter.impl.FieldEncoderFactory.createFieldEncoder(FieldEncoderFactory.java:77)
> at org.apache.carbondata.processing.newflow.converter.impl.RowConverterImpl.initialize(RowConverterImpl.java:102)
> at org.apache.carbondata.processing.newflow.steps.DataConverterProcessorStepImpl.initialize(DataConverterProcessorStepImpl.java:69)
> at org.apache.carbondata.processing.newflow.steps.SortProcessorStepImpl.initialize(SortProcessorStepImpl.java:57)
> at org.apache.carbondata.processing.newflow.steps.DataWriterProcessorStepImpl.initialize(DataWriterProcessorStepImpl.java:79)
> at org.apache.carbondata.processing.newflow.DataLoadExecutor.execute(DataLoadExecutor.java:45)
> at org.apache.carbondata.spark.rdd.NewDataFrameLoaderRDD$$anon$2.<init>(NewCarbonDataLoadRDD.scala:425)
> at org.apache.carbondata.spark.rdd.NewDataFrameLoaderRDD.compute(NewCarbonDataLoadRDD.scala:383)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)