nihal0107 opened a new pull request #4015: URL: https://github.com/apache/carbondata/pull/4015 ### Why is this PR needed? Currently in case of `insert overwrite query` SI existing segment are not getting marked for delete. Because of this select query on SI table is giving wrong result. ### What changes were proposed in this PR? Handled insert overwrite for SI table and changed the status of existing segment as `MARKED_FOR_DELETE`. ### Does this PR introduce any user interface change? - No ### Is any new testcase added? - Yes ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
CarbonDataQA2 commented on pull request #4015: URL: https://github.com/apache/carbondata/pull/4015#issuecomment-731119279 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3071/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4015: URL: https://github.com/apache/carbondata/pull/4015#issuecomment-731152016 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4827/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4015: URL: https://github.com/apache/carbondata/pull/4015#issuecomment-731997987 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4853/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4015: URL: https://github.com/apache/carbondata/pull/4015#issuecomment-732002235 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3100/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #4015: URL: https://github.com/apache/carbondata/pull/4015#discussion_r530123834 ########## File path: index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithInsertOverwrite.scala ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.secondaryindex; + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterEach + +class TestSIWithInsertOverwrite extends QueryTest with BeforeAndAfterEach { + + override protected def beforeEach(): Unit = { + sql("drop table if exists maintable") + sql("create table maintable(name string, Id int, address string) stored as carbondata") + sql("drop index if exists maintable_si on maintable") + sql("CREATE INDEX maintable_si on table maintable (address) as 'carbondata'") + } + + test("test insert overwrite with SI") { + sql("insert into maintable select 'nihal',1,'nko'") + sql("insert into maintable select 'brinjal',2,'valid'") + checkAnswer(sql("select count(*) from maintable_si WHERE address='nko'"), Seq(Row(1))) + checkAnswer(sql("select address from maintable_si"), Seq(Row("nko"), Row("valid"))) Review comment: the above two check answer not required, as its basic test and covered in many test cases ########## File path: index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithInsertOverwrite.scala ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.secondaryindex; + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterEach + +class TestSIWithInsertOverwrite extends QueryTest with BeforeAndAfterEach { + + override protected def beforeEach(): Unit = { + sql("drop table if exists maintable") + sql("create table maintable(name string, Id int, address string) stored as carbondata") + sql("drop index if exists maintable_si on maintable") + sql("CREATE INDEX maintable_si on table maintable (address) as 'carbondata'") + } + + test("test insert overwrite with SI") { + sql("insert into maintable select 'nihal',1,'nko'") + sql("insert into maintable select 'brinjal',2,'valid'") + checkAnswer(sql("select count(*) from maintable_si WHERE address='nko'"), Seq(Row(1))) + checkAnswer(sql("select address from maintable_si"), Seq(Row("nko"), Row("valid"))) + sql("insert overwrite table maintable select 'nihal', 1, 'asdfa'") + checkAnswer(sql("select count(*) from maintable_si WHERE address='nko'"), Seq(Row(0))) + checkAnswer(sql("select address from maintable_si"), Seq(Row("asdfa"))) + checkAnswer(sql("select * from maintable"), Seq(Row("nihal", 1, "asdfa"))) + } + + test("test insert overwrite with CTAS and SI") { + sql("insert into maintable select 'nihal',1,'nko'") + sql("drop table if exists ctas_maintable") + sql("CREATE TABLE ctas_maintable " + + "STORED AS carbondata as select * from maintable") + checkAnswer(sql("select count(*) from ctas_maintable"), Seq(Row(1))) + assert(sql("show indexes on table ctas_maintable").collect().isEmpty) + sql("CREATE INDEX ctas_maintable_si on table ctas_maintable (address) as 'carbondata'") Review comment: line 48, 49 not required ########## File path: index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithInsertOverwrite.scala ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.secondaryindex; + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterEach + +class TestSIWithInsertOverwrite extends QueryTest with BeforeAndAfterEach { + + override protected def beforeEach(): Unit = { + sql("drop table if exists maintable") + sql("create table maintable(name string, Id int, address string) stored as carbondata") + sql("drop index if exists maintable_si on maintable") + sql("CREATE INDEX maintable_si on table maintable (address) as 'carbondata'") + } + + test("test insert overwrite with SI") { + sql("insert into maintable select 'nihal',1,'nko'") + sql("insert into maintable select 'brinjal',2,'valid'") + checkAnswer(sql("select count(*) from maintable_si WHERE address='nko'"), Seq(Row(1))) + checkAnswer(sql("select address from maintable_si"), Seq(Row("nko"), Row("valid"))) + sql("insert overwrite table maintable select 'nihal', 1, 'asdfa'") + checkAnswer(sql("select count(*) from maintable_si WHERE address='nko'"), Seq(Row(0))) + checkAnswer(sql("select address from maintable_si"), Seq(Row("asdfa"))) + checkAnswer(sql("select * from maintable"), Seq(Row("nihal", 1, "asdfa"))) Review comment: select * from maintable is not required ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala ########## @@ -299,6 +301,30 @@ object CarbonIndexUtil { .Map((carbonLoadModel.getSegmentId, carbonLoadModel.getFactTimeStamp)) } val header = indexTable.getCreateOrderColumn.asScala.map(_.getColName).toArray + var segmentList = new ListBuffer[String]() + if (isInsertOverWrite) { + val loadMetadataDetails = carbonLoadModel.getLoadMetadataDetails.asScala + for (loadMetadata <- loadMetadataDetails) { + if (loadMetadata.getSegmentStatus != SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) { + segmentList += loadMetadata.getLoadName + segmentIdToLoadStartTimeMapping.put(loadMetadata.getLoadName, + loadMetadata.getLoadStartTime) + } + } + if (segmentList.nonEmpty) { Review comment: in case of main table load flow, first we make the new segment as Insert_Overwrite_Inprogress, then load that data and at last we make marked for delete for overwritten segment and success for new segment. Same thing please follow for SI also, if you make marked for delete at the beginning, then concurrent clean files and query can cause issues, so once we make the new segment success in SI, then make the marked for delete for all the overwritten segment. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4015: URL: https://github.com/apache/carbondata/pull/4015#issuecomment-733734632 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3150/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4015: URL: https://github.com/apache/carbondata/pull/4015#issuecomment-733775936 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4905/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
nihal0107 commented on a change in pull request #4015: URL: https://github.com/apache/carbondata/pull/4015#discussion_r530768947 ########## File path: index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithInsertOverwrite.scala ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.secondaryindex; + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterEach + +class TestSIWithInsertOverwrite extends QueryTest with BeforeAndAfterEach { + + override protected def beforeEach(): Unit = { + sql("drop table if exists maintable") + sql("create table maintable(name string, Id int, address string) stored as carbondata") + sql("drop index if exists maintable_si on maintable") + sql("CREATE INDEX maintable_si on table maintable (address) as 'carbondata'") + } + + test("test insert overwrite with SI") { + sql("insert into maintable select 'nihal',1,'nko'") + sql("insert into maintable select 'brinjal',2,'valid'") + checkAnswer(sql("select count(*) from maintable_si WHERE address='nko'"), Seq(Row(1))) + checkAnswer(sql("select address from maintable_si"), Seq(Row("nko"), Row("valid"))) Review comment: removed ########## File path: index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithInsertOverwrite.scala ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.secondaryindex; + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterEach + +class TestSIWithInsertOverwrite extends QueryTest with BeforeAndAfterEach { + + override protected def beforeEach(): Unit = { + sql("drop table if exists maintable") + sql("create table maintable(name string, Id int, address string) stored as carbondata") + sql("drop index if exists maintable_si on maintable") + sql("CREATE INDEX maintable_si on table maintable (address) as 'carbondata'") + } + + test("test insert overwrite with SI") { + sql("insert into maintable select 'nihal',1,'nko'") + sql("insert into maintable select 'brinjal',2,'valid'") + checkAnswer(sql("select count(*) from maintable_si WHERE address='nko'"), Seq(Row(1))) + checkAnswer(sql("select address from maintable_si"), Seq(Row("nko"), Row("valid"))) + sql("insert overwrite table maintable select 'nihal', 1, 'asdfa'") + checkAnswer(sql("select count(*) from maintable_si WHERE address='nko'"), Seq(Row(0))) + checkAnswer(sql("select address from maintable_si"), Seq(Row("asdfa"))) + checkAnswer(sql("select * from maintable"), Seq(Row("nihal", 1, "asdfa"))) Review comment: removed ########## File path: index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithInsertOverwrite.scala ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.secondaryindex; + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterEach + +class TestSIWithInsertOverwrite extends QueryTest with BeforeAndAfterEach { + + override protected def beforeEach(): Unit = { + sql("drop table if exists maintable") + sql("create table maintable(name string, Id int, address string) stored as carbondata") + sql("drop index if exists maintable_si on maintable") + sql("CREATE INDEX maintable_si on table maintable (address) as 'carbondata'") + } + + test("test insert overwrite with SI") { + sql("insert into maintable select 'nihal',1,'nko'") + sql("insert into maintable select 'brinjal',2,'valid'") + checkAnswer(sql("select count(*) from maintable_si WHERE address='nko'"), Seq(Row(1))) + checkAnswer(sql("select address from maintable_si"), Seq(Row("nko"), Row("valid"))) + sql("insert overwrite table maintable select 'nihal', 1, 'asdfa'") + checkAnswer(sql("select count(*) from maintable_si WHERE address='nko'"), Seq(Row(0))) + checkAnswer(sql("select address from maintable_si"), Seq(Row("asdfa"))) + checkAnswer(sql("select * from maintable"), Seq(Row("nihal", 1, "asdfa"))) + } + + test("test insert overwrite with CTAS and SI") { + sql("insert into maintable select 'nihal',1,'nko'") + sql("drop table if exists ctas_maintable") + sql("CREATE TABLE ctas_maintable " + + "STORED AS carbondata as select * from maintable") + checkAnswer(sql("select count(*) from ctas_maintable"), Seq(Row(1))) + assert(sql("show indexes on table ctas_maintable").collect().isEmpty) + sql("CREATE INDEX ctas_maintable_si on table ctas_maintable (address) as 'carbondata'") Review comment: removed ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
nihal0107 commented on a change in pull request #4015: URL: https://github.com/apache/carbondata/pull/4015#discussion_r530769072 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala ########## @@ -299,6 +301,30 @@ object CarbonIndexUtil { .Map((carbonLoadModel.getSegmentId, carbonLoadModel.getFactTimeStamp)) } val header = indexTable.getCreateOrderColumn.asScala.map(_.getColName).toArray + var segmentList = new ListBuffer[String]() + if (isInsertOverWrite) { + val loadMetadataDetails = carbonLoadModel.getLoadMetadataDetails.asScala + for (loadMetadata <- loadMetadataDetails) { + if (loadMetadata.getSegmentStatus != SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) { + segmentList += loadMetadata.getLoadName + segmentIdToLoadStartTimeMapping.put(loadMetadata.getLoadName, + loadMetadata.getLoadStartTime) + } + } + if (segmentList.nonEmpty) { Review comment: made the changes ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #4015: URL: https://github.com/apache/carbondata/pull/4015#discussion_r532701657 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala ########## @@ -299,6 +300,13 @@ object CarbonIndexUtil { .Map((carbonLoadModel.getSegmentId, carbonLoadModel.getFactTimeStamp)) } val header = indexTable.getCreateOrderColumn.asScala.map(_.getColName).toArray + if (isInsertOverWrite) { + val loadMetadataDetails = carbonLoadModel.getLoadMetadataDetails.asScala + for (loadMetadata <- loadMetadataDetails) { Review comment: do not use tradittional way of looping, using scala functional way, .foreach ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListener.scala ########## @@ -79,13 +78,16 @@ class SILoadEventListener extends OperationEventListener with Logging { .lookupRelation(Some(carbonLoadModel.getDatabaseName), indexTableName)(sparkSession).asInstanceOf[CarbonRelation].carbonTable + val isInsertOverwrite = (operationContext.getProperties Review comment: remove unnecessary brackets ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala ########## @@ -385,7 +392,26 @@ object SecondaryIndexCreator { val rebuiltSegments = SecondaryIndexUtil .mergeDataFilesSISegments(secondaryIndexModel.segmentIdToLoadStartTimeMapping, indexCarbonTable, - loadMetadataDetails.toList.asJava, carbonLoadModelForMergeDataFiles)(sc) + loadMetadataDetail.toList.asJava, carbonLoadModelForMergeDataFiles)(sc) + if (isInsertOverwrite) { + var segmentList = new ListBuffer[String]() Review comment: rename to more meaningful like what this list does, or required? ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala ########## @@ -385,7 +392,26 @@ object SecondaryIndexCreator { val rebuiltSegments = SecondaryIndexUtil .mergeDataFilesSISegments(secondaryIndexModel.segmentIdToLoadStartTimeMapping, indexCarbonTable, - loadMetadataDetails.toList.asJava, carbonLoadModelForMergeDataFiles)(sc) + loadMetadataDetail.toList.asJava, carbonLoadModelForMergeDataFiles)(sc) + if (isInsertOverwrite) { + var segmentList = new ListBuffer[String]() + for (loadMetadata <- loadMetadataDetails) { + if (loadMetadata.getSegmentStatus != SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) { + segmentList += loadMetadata.getLoadName + } + } + if (segmentList.nonEmpty) { Review comment: add a comment ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala ########## @@ -385,7 +392,26 @@ object SecondaryIndexCreator { val rebuiltSegments = SecondaryIndexUtil .mergeDataFilesSISegments(secondaryIndexModel.segmentIdToLoadStartTimeMapping, indexCarbonTable, - loadMetadataDetails.toList.asJava, carbonLoadModelForMergeDataFiles)(sc) + loadMetadataDetail.toList.asJava, carbonLoadModelForMergeDataFiles)(sc) + if (isInsertOverwrite) { + var segmentList = new ListBuffer[String]() + for (loadMetadata <- loadMetadataDetails) { Review comment: same as above, use foreach ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala ########## @@ -371,11 +377,12 @@ object SecondaryIndexCreator { val loadMetadataDetails = SegmentStatusManager .readLoadMetadata(indexCarbonTable.getMetadataPath) - .filter(loadMetadataDetail => successSISegments.contains(loadMetadataDetail.getLoadName)) + val loadMetadataDetail = loadMetadataDetails Review comment: better to rename it, else it will be confusing ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala ########## @@ -385,7 +392,26 @@ object SecondaryIndexCreator { val rebuiltSegments = SecondaryIndexUtil .mergeDataFilesSISegments(secondaryIndexModel.segmentIdToLoadStartTimeMapping, indexCarbonTable, - loadMetadataDetails.toList.asJava, carbonLoadModelForMergeDataFiles)(sc) + loadMetadataDetail.toList.asJava, carbonLoadModelForMergeDataFiles)(sc) + if (isInsertOverwrite) { + var segmentList = new ListBuffer[String]() + for (loadMetadata <- loadMetadataDetails) { + if (loadMetadata.getSegmentStatus != SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) { + segmentList += loadMetadata.getLoadName + } + } + if (segmentList.nonEmpty) { Review comment: please change the status to marked for delete after making the new insert overwritten segment to success. else we may get issues of reliability in concurrent scenarios ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
nihal0107 commented on a change in pull request #4015: URL: https://github.com/apache/carbondata/pull/4015#discussion_r533408559 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala ########## @@ -299,6 +300,13 @@ object CarbonIndexUtil { .Map((carbonLoadModel.getSegmentId, carbonLoadModel.getFactTimeStamp)) } val header = indexTable.getCreateOrderColumn.asScala.map(_.getColName).toArray + if (isInsertOverWrite) { + val loadMetadataDetails = carbonLoadModel.getLoadMetadataDetails.asScala + for (loadMetadata <- loadMetadataDetails) { Review comment: done ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListener.scala ########## @@ -79,13 +78,16 @@ class SILoadEventListener extends OperationEventListener with Logging { .lookupRelation(Some(carbonLoadModel.getDatabaseName), indexTableName)(sparkSession).asInstanceOf[CarbonRelation].carbonTable + val isInsertOverwrite = (operationContext.getProperties Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
nihal0107 commented on a change in pull request #4015: URL: https://github.com/apache/carbondata/pull/4015#discussion_r533408885 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala ########## @@ -371,11 +377,12 @@ object SecondaryIndexCreator { val loadMetadataDetails = SegmentStatusManager .readLoadMetadata(indexCarbonTable.getMetadataPath) - .filter(loadMetadataDetail => successSISegments.contains(loadMetadataDetail.getLoadName)) + val loadMetadataDetail = loadMetadataDetails Review comment: removed as logic changed ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala ########## @@ -385,7 +392,26 @@ object SecondaryIndexCreator { val rebuiltSegments = SecondaryIndexUtil .mergeDataFilesSISegments(secondaryIndexModel.segmentIdToLoadStartTimeMapping, indexCarbonTable, - loadMetadataDetails.toList.asJava, carbonLoadModelForMergeDataFiles)(sc) + loadMetadataDetail.toList.asJava, carbonLoadModelForMergeDataFiles)(sc) + if (isInsertOverwrite) { + var segmentList = new ListBuffer[String]() + for (loadMetadata <- loadMetadataDetails) { Review comment: done ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala ########## @@ -385,7 +392,26 @@ object SecondaryIndexCreator { val rebuiltSegments = SecondaryIndexUtil .mergeDataFilesSISegments(secondaryIndexModel.segmentIdToLoadStartTimeMapping, indexCarbonTable, - loadMetadataDetails.toList.asJava, carbonLoadModelForMergeDataFiles)(sc) + loadMetadataDetail.toList.asJava, carbonLoadModelForMergeDataFiles)(sc) + if (isInsertOverwrite) { + var segmentList = new ListBuffer[String]() Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
nihal0107 commented on a change in pull request #4015: URL: https://github.com/apache/carbondata/pull/4015#discussion_r533409156 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala ########## @@ -385,7 +392,26 @@ object SecondaryIndexCreator { val rebuiltSegments = SecondaryIndexUtil .mergeDataFilesSISegments(secondaryIndexModel.segmentIdToLoadStartTimeMapping, indexCarbonTable, - loadMetadataDetails.toList.asJava, carbonLoadModelForMergeDataFiles)(sc) + loadMetadataDetail.toList.asJava, carbonLoadModelForMergeDataFiles)(sc) + if (isInsertOverwrite) { + var segmentList = new ListBuffer[String]() + for (loadMetadata <- loadMetadataDetails) { + if (loadMetadata.getSegmentStatus != SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) { + segmentList += loadMetadata.getLoadName + } + } + if (segmentList.nonEmpty) { Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
nihal0107 commented on a change in pull request #4015: URL: https://github.com/apache/carbondata/pull/4015#discussion_r533409267 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala ########## @@ -385,7 +392,26 @@ object SecondaryIndexCreator { val rebuiltSegments = SecondaryIndexUtil .mergeDataFilesSISegments(secondaryIndexModel.segmentIdToLoadStartTimeMapping, indexCarbonTable, - loadMetadataDetails.toList.asJava, carbonLoadModelForMergeDataFiles)(sc) + loadMetadataDetail.toList.asJava, carbonLoadModelForMergeDataFiles)(sc) + if (isInsertOverwrite) { + var segmentList = new ListBuffer[String]() + for (loadMetadata <- loadMetadataDetails) { + if (loadMetadata.getSegmentStatus != SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) { + segmentList += loadMetadata.getLoadName + } + } + if (segmentList.nonEmpty) { Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4015: URL: https://github.com/apache/carbondata/pull/4015#issuecomment-736617364 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3238/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4015: URL: https://github.com/apache/carbondata/pull/4015#issuecomment-736618751 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4993/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #4015: URL: https://github.com/apache/carbondata/pull/4015#discussion_r533508633 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala ########## @@ -398,6 +404,28 @@ object SecondaryIndexCreator { secondaryIndexModel.sqlContext.sparkSession, carbonLoadModelForMergeDataFiles.getFactTimeStamp, rebuiltSegments) + + if (isInsertOverwrite) { + var staleSegmentsList = new ListBuffer[String]() Review comment: `staleSegmentsList ` name depicts wrong info, better to rename variable to `overriddenSegments` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #4015: URL: https://github.com/apache/carbondata/pull/4015#discussion_r533511617 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala ########## @@ -398,6 +404,28 @@ object SecondaryIndexCreator { secondaryIndexModel.sqlContext.sparkSession, carbonLoadModelForMergeDataFiles.getFactTimeStamp, rebuiltSegments) + + if (isInsertOverwrite) { + var staleSegmentsList = new ListBuffer[String]() + SegmentStatusManager + .readLoadMetadata(indexCarbonTable.getMetadataPath).foreach { loadMetadata => + if (!successSISegments.contains(loadMetadata.getLoadName)) { Review comment: you can use directly filter instead of creating new buffer ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
nihal0107 commented on a change in pull request #4015: URL: https://github.com/apache/carbondata/pull/4015#discussion_r533571712 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala ########## @@ -398,6 +404,28 @@ object SecondaryIndexCreator { secondaryIndexModel.sqlContext.sparkSession, carbonLoadModelForMergeDataFiles.getFactTimeStamp, rebuiltSegments) + + if (isInsertOverwrite) { + var staleSegmentsList = new ListBuffer[String]() Review comment: removed ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
Free forum by Nabble | Edit this page |