GitHub user ajantha-bhat opened a pull request:
https://github.com/apache/carbondata/pull/2876 [CARBONDATA-3054] Fix Dictionary file cannot be read in S3a with CarbonDictionaryDecoder.doConsume() codeGen [CARBONDATA-3054] Fix Dictionary file cannot be read in S3a with CarbonDictionaryDecoder.doConsume() codeGen **problem:** In S3a environment, when queried the data which has dictionary files, Dictionary file cannot be read in S3a with CarbonDictionaryDecoder.doConsume() codeGen even though file is present. **cause:** CarbonDictionaryDecoder.doConsume() codeGen doesn't set hadoop conf in thread local variable, only doExecute() sets it. Hence, when getDictionaryWrapper() called from doConsume() codeGen, AbstractDictionaryCache.getDictionaryMetaCarbonFile() returns false for fileExists() operation. **solution:** In CarbonDictionaryDecoder.doConsume() codeGen, set hadoop conf in thread local variable Be sure to do all of the following checklist to help us incorporate your contribution quickly and easily: - [ ] Any interfaces changed? NA - [ ] Any backward compatibility impacted? NA - [ ] Document update required? NA - [ ] Testing done done - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. NA You can merge this pull request into a Git repository by running: $ git pull https://github.com/ajantha-bhat/carbondata master_new Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/2876.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2876 ---- commit 492823671d0aead8393a55f147d5e11c21e0bdfa Author: ajantha-bhat <ajanthabhat@...> Date: 2018-10-29T12:26:29Z [CARBONDATA-3054] Fix Dictionary file cannot be read in S3a with CarbonDictionaryDecoder.doConsume() codeGen ---- --- |
Github user ajantha-bhat commented on the issue:
https://github.com/apache/carbondata/pull/2876 @ravipesala , @kunal642 : please review --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2876 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1122/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2876 Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1334/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2876 Build Success with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9386/ --- |
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on the issue:
https://github.com/apache/carbondata/pull/2876 retest this please --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2876 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1126/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2876 Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1339/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2876 Build Success with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9390/ --- |
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on the issue:
https://github.com/apache/carbondata/pull/2876 retest this please --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2876 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1135/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2876 Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1347/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2876 Build Success with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9399/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2876 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1140/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2876 Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9405/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2876 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1352/ --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2876#discussion_r229208346 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala --- @@ -137,14 +139,15 @@ case class CarbonDictionaryDecoder( val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY) val dicts: Seq[ForwardDictionaryWrapper] = getDictionaryWrapper(tableNameToCarbonTableMapping, - forwardDictionaryCache) + forwardDictionaryCache, conf) val exprs = child.output.map { exp => ExpressionCanonicalizer.execute(BindReferences.bindReference(exp, child.output)) } ctx.currentVars = input val resultVars = exprs.zipWithIndex.map { case (expr, index) => if (dicts(index) != null) { + ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) --- End diff -- Remove its usage from this place --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2876#discussion_r229208742 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala --- @@ -557,16 +561,20 @@ class CarbonDecoderRDD( * It is a wrapper around Dictionary, it is a work around to keep the dictionary serializable in * case of codegen * @param dictIdentifier Dictionary column unique identifier + * @param broadcastConf hadoop broadcast conf for serialization, that contains carbon conf. */ class ForwardDictionaryWrapper( - dictIdentifier: DictionaryColumnUniqueIdentifier) extends Serializable { + dictIdentifier: DictionaryColumnUniqueIdentifier, + broadcastConf: Broadcast[SerializableConfiguration]) extends Serializable { var dictionary: Dictionary = null var dictionaryLoader: DictionaryLoader = _ def getDictionaryValueForKeyInBytes (surrogateKey: Int): Array[Byte] = { if (dictionary == null) { + ThreadLocalSessionInfo.getOrCreateCarbonSessionInfo().getNonSerializableExtraInfo --- End diff -- Move this in doConsume method inside `if (CarbonDictionaryDecoder.isRequiredToDecode(getDictionaryColumnIds)) {` as wrapper is not the correct place for setting ThreadLocalSessionInfo --- |
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2876#discussion_r229237604 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala --- @@ -137,14 +139,15 @@ case class CarbonDictionaryDecoder( val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY) val dicts: Seq[ForwardDictionaryWrapper] = getDictionaryWrapper(tableNameToCarbonTableMapping, - forwardDictionaryCache) + forwardDictionaryCache, conf) val exprs = child.output.map { exp => ExpressionCanonicalizer.execute(BindReferences.bindReference(exp, child.output)) } ctx.currentVars = input val resultVars = exprs.zipWithIndex.map { case (expr, index) => if (dicts(index) != null) { + ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) --- End diff -- done. Setted in codeGen now. --- |
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2876#discussion_r229237654 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala --- @@ -557,16 +561,20 @@ class CarbonDecoderRDD( * It is a wrapper around Dictionary, it is a work around to keep the dictionary serializable in * case of codegen * @param dictIdentifier Dictionary column unique identifier + * @param broadcastConf hadoop broadcast conf for serialization, that contains carbon conf. */ class ForwardDictionaryWrapper( - dictIdentifier: DictionaryColumnUniqueIdentifier) extends Serializable { + dictIdentifier: DictionaryColumnUniqueIdentifier, + broadcastConf: Broadcast[SerializableConfiguration]) extends Serializable { var dictionary: Dictionary = null var dictionaryLoader: DictionaryLoader = _ def getDictionaryValueForKeyInBytes (surrogateKey: Int): Array[Byte] = { if (dictionary == null) { + ThreadLocalSessionInfo.getOrCreateCarbonSessionInfo().getNonSerializableExtraInfo --- End diff -- done. Setted in codeGen now --- |
Free forum by Nabble | Edit this page |