Login  Register

[GitHub] [carbondata] akashrn5 commented on a change in pull request #4015: [CARBONDATA-4052] Handled insert overwrite scenario for SI

Posted by GitBox on Nov 25, 2020; 6:43am
URL: http://apache-carbondata-dev-mailing-list-archive.168.s1.nabble.com/GitHub-carbondata-nihal0107-opened-a-new-pull-request-4015-Handled-insert-overwrite-scenario-for-SI-tp103311p103568.html


akashrn5 commented on a change in pull request #4015:
URL: https://github.com/apache/carbondata/pull/4015#discussion_r530123834



##########
File path: index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithInsertOverwrite.scala
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.secondaryindex;
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterEach
+
+class TestSIWithInsertOverwrite extends QueryTest with BeforeAndAfterEach {
+
+  override protected def beforeEach(): Unit = {
+    sql("drop table if exists maintable")
+    sql("create table maintable(name string, Id int, address string) stored as carbondata")
+    sql("drop index if exists maintable_si on maintable")
+    sql("CREATE INDEX maintable_si  on table maintable (address) as 'carbondata'")
+  }
+
+  test("test insert overwrite with SI") {
+    sql("insert into maintable select 'nihal',1,'nko'")
+    sql("insert into maintable select 'brinjal',2,'valid'")
+    checkAnswer(sql("select count(*) from maintable_si WHERE address='nko'"), Seq(Row(1)))
+    checkAnswer(sql("select address from maintable_si"), Seq(Row("nko"), Row("valid")))

Review comment:
       the above two check answer not required, as its basic test and covered in many test cases

##########
File path: index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithInsertOverwrite.scala
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.secondaryindex;
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterEach
+
+class TestSIWithInsertOverwrite extends QueryTest with BeforeAndAfterEach {
+
+  override protected def beforeEach(): Unit = {
+    sql("drop table if exists maintable")
+    sql("create table maintable(name string, Id int, address string) stored as carbondata")
+    sql("drop index if exists maintable_si on maintable")
+    sql("CREATE INDEX maintable_si  on table maintable (address) as 'carbondata'")
+  }
+
+  test("test insert overwrite with SI") {
+    sql("insert into maintable select 'nihal',1,'nko'")
+    sql("insert into maintable select 'brinjal',2,'valid'")
+    checkAnswer(sql("select count(*) from maintable_si WHERE address='nko'"), Seq(Row(1)))
+    checkAnswer(sql("select address from maintable_si"), Seq(Row("nko"), Row("valid")))
+    sql("insert overwrite table maintable select 'nihal', 1, 'asdfa'")
+    checkAnswer(sql("select count(*) from maintable_si WHERE address='nko'"), Seq(Row(0)))
+    checkAnswer(sql("select address from maintable_si"), Seq(Row("asdfa")))
+    checkAnswer(sql("select * from maintable"), Seq(Row("nihal", 1, "asdfa")))
+  }
+
+  test("test insert overwrite with CTAS and SI") {
+    sql("insert into maintable select 'nihal',1,'nko'")
+    sql("drop table if exists ctas_maintable")
+    sql("CREATE TABLE ctas_maintable " +
+      "STORED AS carbondata as select * from maintable")
+    checkAnswer(sql("select count(*) from ctas_maintable"), Seq(Row(1)))
+    assert(sql("show indexes on table ctas_maintable").collect().isEmpty)
+    sql("CREATE INDEX ctas_maintable_si  on table ctas_maintable (address) as 'carbondata'")

Review comment:
       line 48, 49 not required

##########
File path: index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithInsertOverwrite.scala
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.secondaryindex;
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterEach
+
+class TestSIWithInsertOverwrite extends QueryTest with BeforeAndAfterEach {
+
+  override protected def beforeEach(): Unit = {
+    sql("drop table if exists maintable")
+    sql("create table maintable(name string, Id int, address string) stored as carbondata")
+    sql("drop index if exists maintable_si on maintable")
+    sql("CREATE INDEX maintable_si  on table maintable (address) as 'carbondata'")
+  }
+
+  test("test insert overwrite with SI") {
+    sql("insert into maintable select 'nihal',1,'nko'")
+    sql("insert into maintable select 'brinjal',2,'valid'")
+    checkAnswer(sql("select count(*) from maintable_si WHERE address='nko'"), Seq(Row(1)))
+    checkAnswer(sql("select address from maintable_si"), Seq(Row("nko"), Row("valid")))
+    sql("insert overwrite table maintable select 'nihal', 1, 'asdfa'")
+    checkAnswer(sql("select count(*) from maintable_si WHERE address='nko'"), Seq(Row(0)))
+    checkAnswer(sql("select address from maintable_si"), Seq(Row("asdfa")))
+    checkAnswer(sql("select * from maintable"), Seq(Row("nihal", 1, "asdfa")))

Review comment:
       select * from maintable is not required

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -299,6 +301,30 @@ object CarbonIndexUtil {
         .Map((carbonLoadModel.getSegmentId, carbonLoadModel.getFactTimeStamp))
     }
     val header = indexTable.getCreateOrderColumn.asScala.map(_.getColName).toArray
+    var segmentList = new ListBuffer[String]()
+    if (isInsertOverWrite) {
+      val loadMetadataDetails = carbonLoadModel.getLoadMetadataDetails.asScala
+      for (loadMetadata <- loadMetadataDetails) {
+        if (loadMetadata.getSegmentStatus != SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) {
+          segmentList += loadMetadata.getLoadName
+          segmentIdToLoadStartTimeMapping.put(loadMetadata.getLoadName,
+            loadMetadata.getLoadStartTime)
+        }
+      }
+      if (segmentList.nonEmpty) {

Review comment:
       in case of main table load flow, first we make the new segment as Insert_Overwrite_Inprogress, then load that data and at last we make marked for delete for overwritten segment and success for new segment. Same thing please follow for SI also, if you make marked for delete at the beginning, then concurrent clean files and query can cause issues, so once we make the new segment success in SI, then make the marked for delete for all the overwritten segment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]