Karan980 opened a new pull request #4083: URL: https://github.com/apache/carbondata/pull/4083 ### Why is this PR needed? When the data files of a SI segment are merged. it results in having more number of rows in SI table than main table. ### What changes were proposed in this PR? CARBON_INPUT_SEGMENT property was not set before creating the dataframe from SI segment. So it was creating dataframe from all the rows in the table, not only from a particular segment. ### Does this PR introduce any user interface change? - No - ### Is any new testcase added? - Yes ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
CarbonDataQA2 commented on pull request #4083: URL: https://github.com/apache/carbondata/pull/4083#issuecomment-769000449 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12444/job/ApacheCarbonPRBuilder2.3/5373/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4083: URL: https://github.com/apache/carbondata/pull/4083#issuecomment-769008593 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12444/job/ApacheCarbon_PR_Builder_2.4.5/3613/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Indhumathi27 commented on a change in pull request #4083: URL: https://github.com/apache/carbondata/pull/4083#discussion_r567597957 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala ########## @@ -580,34 +581,39 @@ object SecondaryIndexCreator { sparkSession: SparkSession, carbonTable: CarbonTable, projections: String, - segments: Array[String]): DataFrame = { + segments: Array[String], + isPositionReferenceRequired: Boolean = false): DataFrame = { try { CarbonThreadUtil.threadSet( CarbonCommonConstants.CARBON_INPUT_SEGMENTS + carbonTable.getDatabaseName + CarbonCommonConstants.POINT + carbonTable.getTableName, segments.mkString(",")) val logicalPlan = sparkSession.sql( s"select $projections from ${ carbonTable.getDatabaseName }.${ carbonTable.getTableName}").queryExecution.logical - val positionId = UnresolvedAlias(Alias(UnresolvedFunction("getPositionId", - Seq.empty, isDistinct = false), CarbonCommonConstants.POSITION_ID)()) - val newLogicalPlan = logicalPlan.transform { - case p: Project => - Project(p.projectList :+ positionId, p.child) - } - val tableProperties = if (carbonTable.isHivePartitionTable) { - // in case of partition table, TableProperties object in carbonEnv is not same as - // in carbonTable object, so update from carbon env itself. - CarbonEnv.getCarbonTable(Some(carbonTable.getDatabaseName), carbonTable.getTableName)( - sparkSession).getTableInfo - .getFactTable - .getTableProperties + if (isPositionReferenceRequired) { Review comment: please add comments about this check ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Indhumathi27 commented on a change in pull request #4083: URL: https://github.com/apache/carbondata/pull/4083#discussion_r567598094 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala ########## @@ -678,9 +678,11 @@ object SecondaryIndexUtil { .collectionAccumulator[Map[String, SegmentMetaDataInfo]] validSegments.foreach { segment => outputModel.setSegmentId(segment.getSegmentNo) - val dataFrame = SparkSQLUtil.createInputDataFrame( + val dataFrame = SecondaryIndexCreator.dataFrameOfSegments( Review comment: please add comments here also ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Indhumathi27 commented on a change in pull request #4083: URL: https://github.com/apache/carbondata/pull/4083#discussion_r567602928 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala ########## @@ -580,34 +581,39 @@ object SecondaryIndexCreator { sparkSession: SparkSession, carbonTable: CarbonTable, projections: String, - segments: Array[String]): DataFrame = { + segments: Array[String], + isPositionReferenceRequired: Boolean = false): DataFrame = { try { CarbonThreadUtil.threadSet( CarbonCommonConstants.CARBON_INPUT_SEGMENTS + carbonTable.getDatabaseName + CarbonCommonConstants.POINT + carbonTable.getTableName, segments.mkString(",")) val logicalPlan = sparkSession.sql( s"select $projections from ${ carbonTable.getDatabaseName }.${ carbonTable.getTableName}").queryExecution.logical - val positionId = UnresolvedAlias(Alias(UnresolvedFunction("getPositionId", Review comment: i think, instead you can check if isPositionReferenceRequired is false and return SparkSQLUtil.execute(newLogicalPlan, sparkSession) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Indhumathi27 commented on a change in pull request #4083: URL: https://github.com/apache/carbondata/pull/4083#discussion_r567602928 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala ########## @@ -580,34 +581,39 @@ object SecondaryIndexCreator { sparkSession: SparkSession, carbonTable: CarbonTable, projections: String, - segments: Array[String]): DataFrame = { + segments: Array[String], + isPositionReferenceRequired: Boolean = false): DataFrame = { try { CarbonThreadUtil.threadSet( CarbonCommonConstants.CARBON_INPUT_SEGMENTS + carbonTable.getDatabaseName + CarbonCommonConstants.POINT + carbonTable.getTableName, segments.mkString(",")) val logicalPlan = sparkSession.sql( s"select $projections from ${ carbonTable.getDatabaseName }.${ carbonTable.getTableName}").queryExecution.logical - val positionId = UnresolvedAlias(Alias(UnresolvedFunction("getPositionId", Review comment: i think, instead you can check if isPositionReferenceRequired is false and return SparkSQLUtil.execute(newLogicalPlan, sparkSession) and keep old changes ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Indhumathi27 commented on a change in pull request #4083: URL: https://github.com/apache/carbondata/pull/4083#discussion_r567602928 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala ########## @@ -580,34 +581,39 @@ object SecondaryIndexCreator { sparkSession: SparkSession, carbonTable: CarbonTable, projections: String, - segments: Array[String]): DataFrame = { + segments: Array[String], + isPositionReferenceRequired: Boolean = false): DataFrame = { try { CarbonThreadUtil.threadSet( CarbonCommonConstants.CARBON_INPUT_SEGMENTS + carbonTable.getDatabaseName + CarbonCommonConstants.POINT + carbonTable.getTableName, segments.mkString(",")) val logicalPlan = sparkSession.sql( s"select $projections from ${ carbonTable.getDatabaseName }.${ carbonTable.getTableName}").queryExecution.logical - val positionId = UnresolvedAlias(Alias(UnresolvedFunction("getPositionId", Review comment: i think, instead you can check if isPositionReferenceRequired is false and return SparkSQLUtil.execute(logicalPlan, sparkSession) and keep old changes ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Karan980 commented on a change in pull request #4083: URL: https://github.com/apache/carbondata/pull/4083#discussion_r567678357 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala ########## @@ -580,34 +581,39 @@ object SecondaryIndexCreator { sparkSession: SparkSession, carbonTable: CarbonTable, projections: String, - segments: Array[String]): DataFrame = { + segments: Array[String], + isPositionReferenceRequired: Boolean = false): DataFrame = { try { CarbonThreadUtil.threadSet( CarbonCommonConstants.CARBON_INPUT_SEGMENTS + carbonTable.getDatabaseName + CarbonCommonConstants.POINT + carbonTable.getTableName, segments.mkString(",")) val logicalPlan = sparkSession.sql( s"select $projections from ${ carbonTable.getDatabaseName }.${ carbonTable.getTableName}").queryExecution.logical - val positionId = UnresolvedAlias(Alias(UnresolvedFunction("getPositionId", - Seq.empty, isDistinct = false), CarbonCommonConstants.POSITION_ID)()) - val newLogicalPlan = logicalPlan.transform { - case p: Project => - Project(p.projectList :+ positionId, p.child) - } - val tableProperties = if (carbonTable.isHivePartitionTable) { - // in case of partition table, TableProperties object in carbonEnv is not same as - // in carbonTable object, so update from carbon env itself. - CarbonEnv.getCarbonTable(Some(carbonTable.getDatabaseName), carbonTable.getTableName)( - sparkSession).getTableInfo - .getFactTable - .getTableProperties + if (isPositionReferenceRequired) { Review comment: Done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Karan980 commented on a change in pull request #4083: URL: https://github.com/apache/carbondata/pull/4083#discussion_r567678484 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala ########## @@ -678,9 +678,11 @@ object SecondaryIndexUtil { .collectionAccumulator[Map[String, SegmentMetaDataInfo]] validSegments.foreach { segment => outputModel.setSegmentId(segment.getSegmentNo) - val dataFrame = SparkSQLUtil.createInputDataFrame( + val dataFrame = SecondaryIndexCreator.dataFrameOfSegments( Review comment: Done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Karan980 commented on a change in pull request #4083: URL: https://github.com/apache/carbondata/pull/4083#discussion_r567678752 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala ########## @@ -580,34 +581,39 @@ object SecondaryIndexCreator { sparkSession: SparkSession, carbonTable: CarbonTable, projections: String, - segments: Array[String]): DataFrame = { + segments: Array[String], + isPositionReferenceRequired: Boolean = false): DataFrame = { try { CarbonThreadUtil.threadSet( CarbonCommonConstants.CARBON_INPUT_SEGMENTS + carbonTable.getDatabaseName + CarbonCommonConstants.POINT + carbonTable.getTableName, segments.mkString(",")) val logicalPlan = sparkSession.sql( s"select $projections from ${ carbonTable.getDatabaseName }.${ carbonTable.getTableName}").queryExecution.logical - val positionId = UnresolvedAlias(Alias(UnresolvedFunction("getPositionId", Review comment: Done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Indhumathi27 commented on a change in pull request #4083: URL: https://github.com/apache/carbondata/pull/4083#discussion_r567695612 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala ########## @@ -678,9 +678,14 @@ object SecondaryIndexUtil { .collectionAccumulator[Map[String, SegmentMetaDataInfo]] validSegments.foreach { segment => outputModel.setSegmentId(segment.getSegmentNo) - val dataFrame = SparkSQLUtil.createInputDataFrame( + // As this dataframe is created to merge the data files of SI segment. So no need to calculate + // positionReference column again, as it is already calculated during SI segment load from + // main table. Review comment: please specify about SET input segments for segment ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4083: URL: https://github.com/apache/carbondata/pull/4083#issuecomment-770771606 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12444/job/ApacheCarbonPRBuilder2.3/5391/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4083: URL: https://github.com/apache/carbondata/pull/4083#issuecomment-770777837 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12444/job/ApacheCarbon_PR_Builder_2.4.5/3631/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Karan980 commented on a change in pull request #4083: URL: https://github.com/apache/carbondata/pull/4083#discussion_r567760931 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala ########## @@ -678,9 +678,14 @@ object SecondaryIndexUtil { .collectionAccumulator[Map[String, SegmentMetaDataInfo]] validSegments.foreach { segment => outputModel.setSegmentId(segment.getSegmentNo) - val dataFrame = SparkSQLUtil.createInputDataFrame( + // As this dataframe is created to merge the data files of SI segment. So no need to calculate + // positionReference column again, as it is already calculated during SI segment load from + // main table. Review comment: Done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4083: URL: https://github.com/apache/carbondata/pull/4083#issuecomment-770848486 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12444/job/ApacheCarbonPRBuilder2.3/5392/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4083: URL: https://github.com/apache/carbondata/pull/4083#issuecomment-770851951 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12444/job/ApacheCarbon_PR_Builder_2.4.5/3633/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on pull request #4083: URL: https://github.com/apache/carbondata/pull/4083#issuecomment-771548605 LGTM ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on pull request #4083: URL: https://github.com/apache/carbondata/pull/4083#issuecomment-771551095 The title of the PR can be more specific like "**Data mismatch issue in SI global sort merge scenario**" ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat edited a comment on pull request #4083: URL: https://github.com/apache/carbondata/pull/4083#issuecomment-771551095 The title of the PR can be more specific like "**Data mismatch issue in SI global sort merge scenario**" @Karan980 : From next time consider this point. I have changed it while merging now. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
Free forum by Nabble | Edit this page |