Kejian-Li commented on a change in pull request #3947: URL: https://github.com/apache/carbondata/pull/3947#discussion_r494964396 ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala ########## @@ -18,34 +18,18 @@ package org.apache.carbondata.spark.testsuite.iud import java.text.SimpleDateFormat -import java.util -import java.util.concurrent.{Callable, ExecutorService, Executors, Future} - -import scala.collection.JavaConverters._ - -import org.apache.hadoop.fs.Path -import org.apache.spark.sql.test.util.QueryTest -import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{DataFrame, SaveMode} -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datastore.block.SegmentProperties -import org.apache.carbondata.core.datastore.page.ColumnPage import org.apache.carbondata.core.exception.ConcurrentOperationException -import org.apache.carbondata.core.features.TableOperation -import org.apache.carbondata.core.index.dev.cgindex.{CoarseGrainIndex, CoarseGrainIndexFactory} -import org.apache.carbondata.core.index.dev.{IndexBuilder, IndexWriter} -import org.apache.carbondata.core.index.{IndexInputSplit, IndexMeta, Segment} -import org.apache.carbondata.core.indexstore.PartitionSpec -import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, IndexSchema} -import org.apache.carbondata.core.scan.filter.intf.ExpressionType import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.events.Event +import org.apache.spark.sql.test.util.QueryTest +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{DataFrame, Row, SaveMode} +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} Review comment: done ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/AsyncExecutorUtils.scala ########## @@ -0,0 +1,200 @@ + + + +package org.apache.carbondata.spark.testsuite.iud Review comment: done ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala ########## @@ -65,7 +65,7 @@ private[sql] case class CarbonProjectForDeleteCommand( s"Unsupported delete operation on table containing mixed format segments") } - if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) { + if (SegmentStatusManager.isInsertOverwriteInProgress(carbonTable)) { throw new ConcurrentOperationException(carbonTable, "loading", "data delete") Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Kejian-Li commented on a change in pull request #3947: URL: https://github.com/apache/carbondata/pull/3947#discussion_r494964644 ########## File path: integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala ########## @@ -122,22 +139,23 @@ class TestCreateMVWithTimeSeries extends QueryTest with BeforeAndAfterAll { sql("drop materialized view if exists mv4") sql("drop materialized view if exists mv5") } + dropMVs sql( "create materialized view mv1 as " + - "select timeseries(projectjoindate,'second'), sum(projectcode) from maintable group by timeseries(projectjoindate,'second')") + "select timeseries(projectjoindate,'second'), sum(projectcode) from maintable group by timeseries(projectjoindate,'second')") sql( "create materialized view mv2 as " + - "select timeseries(projectjoindate,'hour'), sum(projectcode) from maintable group by timeseries(projectjoindate,'hour')") + "select timeseries(projectjoindate,'hour'), sum(projectcode) from maintable group by timeseries(projectjoindate,'hour')") Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Kejian-Li commented on a change in pull request #3947: URL: https://github.com/apache/carbondata/pull/3947#discussion_r494964752 ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala ########## @@ -170,17 +146,19 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterA } test("alter rename table should fail if insert overwrite is in progress") { - val future = runSqlAsync("insert overwrite table orders select * from orders_overwrite") + sql("drop table if exists other_orders") + val future = AsyncExecutorUtils.runSqlAsync("insert overwrite table orders select * from orders_overwrite") val ex = intercept[ConcurrentOperationException] { - sql("alter table orders rename to other") + sql("alter table orders rename to other_orders") Review comment: done ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala ########## @@ -92,54 +77,46 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterA private def createTable(tableName: String, schema: StructType): Unit = { val schemaString = schema.fields.map(x => x.name + " " + x.dataType.typeName).mkString(", ") sql(s"CREATE TABLE $tableName ($schemaString) stored as carbondata tblproperties" + - s"('sort_scope'='local_sort','sort_columns'='o_country,o_name,o_phonetype,o_serialname," + - s"o_comment')") - } - - override def afterAll { - executorService.shutdownNow() - dropTable() + s"('sort_scope'='local_sort','sort_columns'='o_country,o_name,o_phonetype,o_serialname," + Review comment: done ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala ########## @@ -68,8 +53,8 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterA .mode(SaveMode.Overwrite) .save() - sql(s"insert into orders select * from temp_table") - sql(s"insert into orders_overwrite select * from temp_table") + sql(s"insert into orders select * from temp_table") // load_0 success + sql(s"insert into orders_overwrite select * from temp_table") // load_0 success Review comment: done ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala ########## @@ -68,8 +53,8 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterA .mode(SaveMode.Overwrite) .save() - sql(s"insert into orders select * from temp_table") - sql(s"insert into orders_overwrite select * from temp_table") + sql(s"insert into orders select * from temp_table") // load_0 success Review comment: done ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala ########## @@ -18,21 +18,18 @@ package org.apache.carbondata.spark.testsuite.iud import java.io.File -import org.apache.spark.sql.hive.CarbonRelation -import org.apache.spark.sql.test.SparkTestQueryExecutor -import org.apache.spark.sql.test.util.QueryTest -import org.apache.spark.sql.{CarbonEnv, Row, SaveMode} -import org.scalatest.BeforeAndAfterAll - import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.index.Segment import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.mutate.CarbonUpdateUtil -import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.spark.sql.hive.CarbonRelation Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Kejian-Li commented on a change in pull request #3947: URL: https://github.com/apache/carbondata/pull/3947#discussion_r494965168 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/UpdateTablePreEventListener.scala ########## @@ -49,7 +49,6 @@ class UpdateTablePreEventListener extends OperationEventListener with Logging { carbonTable .getDatabaseName }.${ carbonTable.getTableName }]. Drop all indexes and retry") Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Kejian-Li commented on a change in pull request #3947: URL: https://github.com/apache/carbondata/pull/3947#discussion_r494965993 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/DeleteFromTableEventListener.scala ########## @@ -53,6 +53,12 @@ class DeleteFromTableEventListener extends OperationEventListener with Logging { carbonTable .getDatabaseName }.${ carbonTable.getTableName }]") + } else if (!carbonTable.getIndexesMap.isEmpty) { Review comment: I think that the original code has clearer error information ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Kejian-Li commented on a change in pull request #3947: URL: https://github.com/apache/carbondata/pull/3947#discussion_r494966092 ########## File path: core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java ########## @@ -918,17 +893,40 @@ public static Boolean isCompactionInProgress(CarbonTable carbonTable) { return compactionInProgress; } + /** + * Return true if insert or insert overwrite is in progress for specified table + */ + public static Boolean isInsertInProgress(CarbonTable carbonTable) { + if (carbonTable == null) { + return false; + } + boolean loadInProgress = false; + String metaPath = carbonTable.getMetadataPath(); + LoadMetadataDetails[] listOfLoadFolderDetailsArray = SegmentStatusManager.readLoadMetadata(metaPath); + if (listOfLoadFolderDetailsArray.length != 0) { + for (LoadMetadataDetails loadDetail :listOfLoadFolderDetailsArray) { + SegmentStatus segmentStatus = loadDetail.getSegmentStatus(); + if (segmentStatus == SegmentStatus.INSERT_IN_PROGRESS + || segmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) { + loadInProgress = + isLoadInProgress(carbonTable.getAbsoluteTableIdentifier(), + loadDetail.getLoadName()); + } + } + } + return loadInProgress; + } + /** * Return true if insert overwrite is in progress for specified table */ - public static Boolean isOverwriteInProgressInTable(CarbonTable carbonTable) { + public static boolean isInsertOverwriteInProgress(CarbonTable carbonTable) { if (carbonTable == null) { return false; } boolean loadInProgress = false; String metaPath = carbonTable.getMetadataPath(); - LoadMetadataDetails[] listOfLoadFolderDetailsArray = - SegmentStatusManager.readLoadMetadata(metaPath); + LoadMetadataDetails[] listOfLoadFolderDetailsArray = SegmentStatusManager.readLoadMetadata(metaPath); Review comment: done ########## File path: core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java ########## @@ -918,17 +893,40 @@ public static Boolean isCompactionInProgress(CarbonTable carbonTable) { return compactionInProgress; } + /** + * Return true if insert or insert overwrite is in progress for specified table + */ + public static Boolean isInsertInProgress(CarbonTable carbonTable) { Review comment: done ########## File path: core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatus.java ########## @@ -31,6 +31,12 @@ @SerializedName("Success") SUCCESS("Success"), + /** Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Kejian-Li closed pull request #3947: URL: https://github.com/apache/carbondata/pull/3947 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Kejian-Li commented on pull request #3947: URL: https://github.com/apache/carbondata/pull/3947#issuecomment-698910832 > Consider a scenario that the user want to insert data and then update the table, the dml commands are sent one by one in time series, but the 2nd(update) cmd is sent by another driver and start to execute while the 1st(insert) is still running, therefore the concurrency scenario described in this PR appears. > If we allow update table who has segment with status 'INSERT_INPROGRESS', the 2nd cmd(update) will execute successfully and the data inserted by the 1st cmd would not updated by the 2nd cmd. > In this case, the result may not be what the user expected. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Kejian-Li removed a comment on pull request #3947: URL: https://github.com/apache/carbondata/pull/3947#issuecomment-698910832 > Consider a scenario that the user want to insert data and then update the table, the dml commands are sent one by one in time series, but the 2nd(update) cmd is sent by another driver and start to execute while the 1st(insert) is still running, therefore the concurrency scenario described in this PR appears. > If we allow update table who has segment with status 'INSERT_INPROGRESS', the 2nd cmd(update) will execute successfully and the data inserted by the 1st cmd would not updated by the 2nd cmd. > In this case, the result may not be what the user expected. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
marchpure commented on a change in pull request #3947: URL: https://github.com/apache/carbondata/pull/3947#discussion_r494968395 ########## File path: integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala ########## @@ -167,18 +182,24 @@ class TestCreateMVWithTimeSeries extends QueryTest with BeforeAndAfterAll { test("insert and create materialized view in progress") { sql("drop materialized view if exists mv1") - val query = s"LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE maintable " + - s"OPTIONS('DELIMITER'= ',')" - val executorService = Executors.newFixedThreadPool(4) - executorService.submit(new QueryTask(query)) - intercept[UnsupportedOperationException] { - sql( - "create materialized view mv1 as " + + + val future = AsyncExecutorUtils.runSqlAsync("insert overwrite table maintable select * from temp_maintable") + val ex = intercept[UnsupportedOperationException] { + sql("create materialized view mv1 as " + "select timeseries(projectjoindate,'year'), sum(projectcode) from maintable group by timeseries(projectjoindate,'year')") - }.getMessage - .contains("Cannot create mv materialized view table when insert is in progress on parent table: maintable") - executorService.shutdown() - executorService.awaitTermination(2, TimeUnit.HOURS) + } + assert(future.get.contains("PASS")) + assert(ex.getMessage.contains("Cannot create mv when insert overwrite is in progress on table default_maintable")) + sql("drop materialized view if exists mv1") + } + + test("create materialized view should success when parent table is insert in progress") { + sql("drop materialized view if exists mv1") + + val future = AsyncExecutorUtils.runSqlAsync("insert into table maintable select * from temp_maintable") + sql("create materialized view mv1 as " + Review comment: Need to check result of 'create materialized view' ########## File path: integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala ########## @@ -33,17 +34,30 @@ class TestCreateMVWithTimeSeries extends QueryTest with BeforeAndAfterAll { override def beforeAll(): Unit = { CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") - drop() + dropTable() sql("CREATE TABLE maintable (empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " + - "deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, utilization int,salary int) STORED AS carbondata") + "deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, utilization int,salary int) STORED AS carbondata") Review comment: take care about the format ########## File path: integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala ########## @@ -33,17 +34,30 @@ class TestCreateMVWithTimeSeries extends QueryTest with BeforeAndAfterAll { override def beforeAll(): Unit = { CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") - drop() + dropTable() sql("CREATE TABLE maintable (empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " + - "deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, utilization int,salary int) STORED AS carbondata") + "deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, utilization int,salary int) STORED AS carbondata") + sql(s""" + | CREATE INDEX maintable_index_test + | ON TABLE maintable (designation) + | AS '${classOf[WaitingIndexFactory].getName}' + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE maintable OPTIONS |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) + + sql("CREATE TABLE temp_maintable (empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " + Review comment: create table again? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
marchpure commented on a change in pull request #3947: URL: https://github.com/apache/carbondata/pull/3947#discussion_r494970696 ########## File path: integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala ########## @@ -33,17 +34,30 @@ class TestCreateMVWithTimeSeries extends QueryTest with BeforeAndAfterAll { override def beforeAll(): Unit = { CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") - drop() + dropTable() sql("CREATE TABLE maintable (empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " + - "deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, utilization int,salary int) STORED AS carbondata") + "deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, utilization int,salary int) STORED AS carbondata") + sql(s""" + | CREATE INDEX maintable_index_test + | ON TABLE maintable (designation) + | AS '${classOf[WaitingIndexFactory].getName}' + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE maintable OPTIONS |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) + + sql("CREATE TABLE temp_maintable (empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " + + "deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, utilization int,salary int) STORED AS carbondata") + + sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE temp_maintable OPTIONS + |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) } - def drop(): Unit = { + def dropTable(): Unit = { sql("drop table if exists products") sql("drop table IF EXISTS main_table") sql("drop table IF EXISTS maintable") + sql("drop table IF EXISTS temp_maintable") Review comment: tablename 'temp_maintable' is strange. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Kejian-Li commented on a change in pull request #3947: URL: https://github.com/apache/carbondata/pull/3947#discussion_r494972678 ########## File path: integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala ########## @@ -33,17 +34,30 @@ class TestCreateMVWithTimeSeries extends QueryTest with BeforeAndAfterAll { override def beforeAll(): Unit = { CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") - drop() + dropTable() sql("CREATE TABLE maintable (empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " + - "deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, utilization int,salary int) STORED AS carbondata") + "deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, utilization int,salary int) STORED AS carbondata") + sql(s""" + | CREATE INDEX maintable_index_test + | ON TABLE maintable (designation) + | AS '${classOf[WaitingIndexFactory].getName}' + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE maintable OPTIONS |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) + + sql("CREATE TABLE temp_maintable (empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " + Review comment: this is temp_maintable here, this temporary table is used to insert into maintable in order to make "insert into or insert overwrite" operation in the following. ########## File path: integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala ########## @@ -33,17 +34,30 @@ class TestCreateMVWithTimeSeries extends QueryTest with BeforeAndAfterAll { override def beforeAll(): Unit = { CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") - drop() + dropTable() sql("CREATE TABLE maintable (empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " + - "deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, utilization int,salary int) STORED AS carbondata") + "deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, utilization int,salary int) STORED AS carbondata") Review comment: copy that ########## File path: integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala ########## @@ -167,18 +182,24 @@ class TestCreateMVWithTimeSeries extends QueryTest with BeforeAndAfterAll { test("insert and create materialized view in progress") { sql("drop materialized view if exists mv1") - val query = s"LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE maintable " + - s"OPTIONS('DELIMITER'= ',')" - val executorService = Executors.newFixedThreadPool(4) - executorService.submit(new QueryTask(query)) - intercept[UnsupportedOperationException] { - sql( - "create materialized view mv1 as " + + + val future = AsyncExecutorUtils.runSqlAsync("insert overwrite table maintable select * from temp_maintable") + val ex = intercept[UnsupportedOperationException] { + sql("create materialized view mv1 as " + "select timeseries(projectjoindate,'year'), sum(projectcode) from maintable group by timeseries(projectjoindate,'year')") - }.getMessage - .contains("Cannot create mv materialized view table when insert is in progress on parent table: maintable") - executorService.shutdown() - executorService.awaitTermination(2, TimeUnit.HOURS) + } + assert(future.get.contains("PASS")) + assert(ex.getMessage.contains("Cannot create mv when insert overwrite is in progress on table default_maintable")) + sql("drop materialized view if exists mv1") + } + + test("create materialized view should success when parent table is insert in progress") { + sql("drop materialized view if exists mv1") + + val future = AsyncExecutorUtils.runSqlAsync("insert into table maintable select * from temp_maintable") + sql("create materialized view mv1 as " + Review comment: roger that ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Kejian-Li commented on pull request #3947: URL: https://github.com/apache/carbondata/pull/3947#issuecomment-698910832 > Consider a scenario that the user want to insert data and then update the table, the dml commands are sent one by one in time series, but the 2nd(update) cmd is sent by another driver and start to execute while the 1st(insert) is still running, therefore the concurrency scenario described in this PR appears. > If we allow update table who has segment with status 'INSERT_INPROGRESS', the 2nd cmd(update) will execute successfully and the data inserted by the 1st cmd would not updated by the 2nd cmd. > In this case, the result may not be what the user expected. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Kejian-Li removed a comment on pull request #3947: URL: https://github.com/apache/carbondata/pull/3947#issuecomment-698910832 > Consider a scenario that the user want to insert data and then update the table, the dml commands are sent one by one in time series, but the 2nd(update) cmd is sent by another driver and start to execute while the 1st(insert) is still running, therefore the concurrency scenario described in this PR appears. > If we allow update table who has segment with status 'INSERT_INPROGRESS', the 2nd cmd(update) will execute successfully and the data inserted by the 1st cmd would not updated by the 2nd cmd. > In this case, the result may not be what the user expected. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
marchpure commented on a change in pull request #3947: URL: https://github.com/apache/carbondata/pull/3947#discussion_r494819647 ########## File path: core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatus.java ########## @@ -31,6 +31,12 @@ @SerializedName("Success") SUCCESS("Success"), + /** Review comment: revert this change ########## File path: core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java ########## @@ -918,17 +893,40 @@ public static Boolean isCompactionInProgress(CarbonTable carbonTable) { return compactionInProgress; } + /** + * Return true if insert or insert overwrite is in progress for specified table + */ + public static Boolean isInsertInProgress(CarbonTable carbonTable) { + if (carbonTable == null) { + return false; + } + boolean loadInProgress = false; + String metaPath = carbonTable.getMetadataPath(); + LoadMetadataDetails[] listOfLoadFolderDetailsArray = SegmentStatusManager.readLoadMetadata(metaPath); + if (listOfLoadFolderDetailsArray.length != 0) { + for (LoadMetadataDetails loadDetail :listOfLoadFolderDetailsArray) { + SegmentStatus segmentStatus = loadDetail.getSegmentStatus(); + if (segmentStatus == SegmentStatus.INSERT_IN_PROGRESS + || segmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) { + loadInProgress = + isLoadInProgress(carbonTable.getAbsoluteTableIdentifier(), + loadDetail.getLoadName()); + } + } + } + return loadInProgress; + } + /** * Return true if insert overwrite is in progress for specified table */ - public static Boolean isOverwriteInProgressInTable(CarbonTable carbonTable) { + public static boolean isInsertOverwriteInProgress(CarbonTable carbonTable) { if (carbonTable == null) { return false; } boolean loadInProgress = false; String metaPath = carbonTable.getMetadataPath(); - LoadMetadataDetails[] listOfLoadFolderDetailsArray = - SegmentStatusManager.readLoadMetadata(metaPath); + LoadMetadataDetails[] listOfLoadFolderDetailsArray = SegmentStatusManager.readLoadMetadata(metaPath); Review comment: revert this change ########## File path: core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java ########## @@ -918,17 +893,40 @@ public static Boolean isCompactionInProgress(CarbonTable carbonTable) { return compactionInProgress; } + /** + * Return true if insert or insert overwrite is in progress for specified table + */ + public static Boolean isInsertInProgress(CarbonTable carbonTable) { Review comment: revert this change ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/DeleteFromTableEventListener.scala ########## @@ -53,6 +53,12 @@ class DeleteFromTableEventListener extends OperationEventListener with Logging { carbonTable .getDatabaseName }.${ carbonTable.getTableName }]") + } else if (!carbonTable.getIndexesMap.isEmpty) { Review comment: change to "if (carbonTable.isIndexTable || !carbonTable.getIndexesMap.isEmpty)" ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/UpdateTablePreEventListener.scala ########## @@ -49,7 +49,6 @@ class UpdateTablePreEventListener extends OperationEventListener with Logging { carbonTable .getDatabaseName }.${ carbonTable.getTableName }]. Drop all indexes and retry") Review comment: revert this change ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala ########## @@ -68,8 +53,8 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterA .mode(SaveMode.Overwrite) .save() - sql(s"insert into orders select * from temp_table") - sql(s"insert into orders_overwrite select * from temp_table") + sql(s"insert into orders select * from temp_table") // load_0 success Review comment: revert this change ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala ########## @@ -18,21 +18,18 @@ package org.apache.carbondata.spark.testsuite.iud import java.io.File -import org.apache.spark.sql.hive.CarbonRelation -import org.apache.spark.sql.test.SparkTestQueryExecutor -import org.apache.spark.sql.test.util.QueryTest -import org.apache.spark.sql.{CarbonEnv, Row, SaveMode} -import org.scalatest.BeforeAndAfterAll - import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.index.Segment import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.mutate.CarbonUpdateUtil -import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.spark.sql.hive.CarbonRelation Review comment: revert this change ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala ########## @@ -68,8 +53,8 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterA .mode(SaveMode.Overwrite) .save() - sql(s"insert into orders select * from temp_table") - sql(s"insert into orders_overwrite select * from temp_table") + sql(s"insert into orders select * from temp_table") // load_0 success + sql(s"insert into orders_overwrite select * from temp_table") // load_0 success Review comment: revert this change ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala ########## @@ -92,54 +77,46 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterA private def createTable(tableName: String, schema: StructType): Unit = { val schemaString = schema.fields.map(x => x.name + " " + x.dataType.typeName).mkString(", ") sql(s"CREATE TABLE $tableName ($schemaString) stored as carbondata tblproperties" + - s"('sort_scope'='local_sort','sort_columns'='o_country,o_name,o_phonetype,o_serialname," + - s"o_comment')") - } - - override def afterAll { - executorService.shutdownNow() - dropTable() + s"('sort_scope'='local_sort','sort_columns'='o_country,o_name,o_phonetype,o_serialname," + Review comment: revert this change ########## File path: integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala ########## @@ -122,22 +139,23 @@ class TestCreateMVWithTimeSeries extends QueryTest with BeforeAndAfterAll { sql("drop materialized view if exists mv4") sql("drop materialized view if exists mv5") } + dropMVs sql( "create materialized view mv1 as " + - "select timeseries(projectjoindate,'second'), sum(projectcode) from maintable group by timeseries(projectjoindate,'second')") + "select timeseries(projectjoindate,'second'), sum(projectcode) from maintable group by timeseries(projectjoindate,'second')") sql( "create materialized view mv2 as " + - "select timeseries(projectjoindate,'hour'), sum(projectcode) from maintable group by timeseries(projectjoindate,'hour')") + "select timeseries(projectjoindate,'hour'), sum(projectcode) from maintable group by timeseries(projectjoindate,'hour')") Review comment: revert all format change ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala ########## @@ -170,17 +146,19 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterA } test("alter rename table should fail if insert overwrite is in progress") { - val future = runSqlAsync("insert overwrite table orders select * from orders_overwrite") + sql("drop table if exists other_orders") + val future = AsyncExecutorUtils.runSqlAsync("insert overwrite table orders select * from orders_overwrite") val ex = intercept[ConcurrentOperationException] { - sql("alter table orders rename to other") + sql("alter table orders rename to other_orders") Review comment: other_orders => different orders ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala ########## @@ -65,7 +65,7 @@ private[sql] case class CarbonProjectForDeleteCommand( s"Unsupported delete operation on table containing mixed format segments") } - if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) { + if (SegmentStatusManager.isInsertOverwriteInProgress(carbonTable)) { throw new ConcurrentOperationException(carbonTable, "loading", "data delete") Review comment: loading->insert overwrite ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala ########## @@ -18,34 +18,18 @@ package org.apache.carbondata.spark.testsuite.iud import java.text.SimpleDateFormat -import java.util -import java.util.concurrent.{Callable, ExecutorService, Executors, Future} - -import scala.collection.JavaConverters._ - -import org.apache.hadoop.fs.Path -import org.apache.spark.sql.test.util.QueryTest -import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{DataFrame, SaveMode} -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datastore.block.SegmentProperties -import org.apache.carbondata.core.datastore.page.ColumnPage import org.apache.carbondata.core.exception.ConcurrentOperationException -import org.apache.carbondata.core.features.TableOperation -import org.apache.carbondata.core.index.dev.cgindex.{CoarseGrainIndex, CoarseGrainIndexFactory} -import org.apache.carbondata.core.index.dev.{IndexBuilder, IndexWriter} -import org.apache.carbondata.core.index.{IndexInputSplit, IndexMeta, Segment} -import org.apache.carbondata.core.indexstore.PartitionSpec -import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, IndexSchema} -import org.apache.carbondata.core.scan.filter.intf.ExpressionType import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.events.Event +import org.apache.spark.sql.test.util.QueryTest +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{DataFrame, Row, SaveMode} +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} Review comment: revert this change ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/AsyncExecutorUtils.scala ########## @@ -0,0 +1,200 @@ + + + +package org.apache.carbondata.spark.testsuite.iud Review comment: remove into util package ########## File path: integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala ########## @@ -167,18 +182,24 @@ class TestCreateMVWithTimeSeries extends QueryTest with BeforeAndAfterAll { test("insert and create materialized view in progress") { sql("drop materialized view if exists mv1") - val query = s"LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE maintable " + - s"OPTIONS('DELIMITER'= ',')" - val executorService = Executors.newFixedThreadPool(4) - executorService.submit(new QueryTask(query)) - intercept[UnsupportedOperationException] { - sql( - "create materialized view mv1 as " + + + val future = AsyncExecutorUtils.runSqlAsync("insert overwrite table maintable select * from temp_maintable") + val ex = intercept[UnsupportedOperationException] { + sql("create materialized view mv1 as " + "select timeseries(projectjoindate,'year'), sum(projectcode) from maintable group by timeseries(projectjoindate,'year')") - }.getMessage - .contains("Cannot create mv materialized view table when insert is in progress on parent table: maintable") - executorService.shutdown() - executorService.awaitTermination(2, TimeUnit.HOURS) + } + assert(future.get.contains("PASS")) + assert(ex.getMessage.contains("Cannot create mv when insert overwrite is in progress on table default_maintable")) + sql("drop materialized view if exists mv1") + } + + test("create materialized view should success when parent table is insert in progress") { + sql("drop materialized view if exists mv1") + + val future = AsyncExecutorUtils.runSqlAsync("insert into table maintable select * from temp_maintable") + sql("create materialized view mv1 as " + Review comment: Need to check result of 'create materialized view' ########## File path: integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala ########## @@ -33,17 +34,30 @@ class TestCreateMVWithTimeSeries extends QueryTest with BeforeAndAfterAll { override def beforeAll(): Unit = { CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") - drop() + dropTable() sql("CREATE TABLE maintable (empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " + - "deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, utilization int,salary int) STORED AS carbondata") + "deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, utilization int,salary int) STORED AS carbondata") Review comment: take care about the format ########## File path: integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala ########## @@ -33,17 +34,30 @@ class TestCreateMVWithTimeSeries extends QueryTest with BeforeAndAfterAll { override def beforeAll(): Unit = { CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") - drop() + dropTable() sql("CREATE TABLE maintable (empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " + - "deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, utilization int,salary int) STORED AS carbondata") + "deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, utilization int,salary int) STORED AS carbondata") + sql(s""" + | CREATE INDEX maintable_index_test + | ON TABLE maintable (designation) + | AS '${classOf[WaitingIndexFactory].getName}' + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE maintable OPTIONS |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) + + sql("CREATE TABLE temp_maintable (empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " + Review comment: create table again? ########## File path: integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala ########## @@ -33,17 +34,30 @@ class TestCreateMVWithTimeSeries extends QueryTest with BeforeAndAfterAll { override def beforeAll(): Unit = { CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") - drop() + dropTable() sql("CREATE TABLE maintable (empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " + - "deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, utilization int,salary int) STORED AS carbondata") + "deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, utilization int,salary int) STORED AS carbondata") + sql(s""" + | CREATE INDEX maintable_index_test + | ON TABLE maintable (designation) + | AS '${classOf[WaitingIndexFactory].getName}' + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE maintable OPTIONS |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) + + sql("CREATE TABLE temp_maintable (empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " + + "deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, utilization int,salary int) STORED AS carbondata") + + sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE temp_maintable OPTIONS + |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) } - def drop(): Unit = { + def dropTable(): Unit = { sql("drop table if exists products") sql("drop table IF EXISTS main_table") sql("drop table IF EXISTS maintable") + sql("drop table IF EXISTS temp_maintable") Review comment: tablename 'temp_maintable' is strange. ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala ########## @@ -322,7 +321,32 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { } - test("block deleting records from table which has index") { + // block delete operation from table which has index. see PR2483 + test("delete should fail if the table has index") { Review comment: Duplicate Test. Please have a check about another testcase 'delete operation is not supported for index' ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala ########## @@ -18,45 +18,31 @@ package org.apache.carbondata.spark.testsuite.iud import java.text.SimpleDateFormat -import java.util -import java.util.concurrent.{Callable, ExecutorService, Executors, Future} -import scala.collection.JavaConverters._ +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.exception.ConcurrentOperationException +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.spark.util.{AsyncExecutorUtils, Global, WaitingIndexFactory} -import org.apache.hadoop.fs.Path import org.apache.spark.sql.test.util.QueryTest import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{DataFrame, SaveMode} +import org.apache.spark.sql.{DataFrame, Row, SaveMode} import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datastore.block.SegmentProperties -import org.apache.carbondata.core.datastore.page.ColumnPage -import org.apache.carbondata.core.exception.ConcurrentOperationException -import org.apache.carbondata.core.features.TableOperation -import org.apache.carbondata.core.index.dev.cgindex.{CoarseGrainIndex, CoarseGrainIndexFactory} -import org.apache.carbondata.core.index.dev.{IndexBuilder, IndexWriter} -import org.apache.carbondata.core.index.{IndexInputSplit, IndexMeta, Segment} -import org.apache.carbondata.core.indexstore.PartitionSpec -import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, IndexSchema} -import org.apache.carbondata.core.scan.filter.intf.ExpressionType -import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.events.Event - // This testsuite test insert and insert overwrite with other commands concurrently class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach { - private val executorService: ExecutorService = Executors.newFixedThreadPool(10) + Review comment: donot keep blank line ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala ########## @@ -18,45 +18,31 @@ package org.apache.carbondata.spark.testsuite.iud import java.text.SimpleDateFormat -import java.util -import java.util.concurrent.{Callable, ExecutorService, Executors, Future} -import scala.collection.JavaConverters._ +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.exception.ConcurrentOperationException +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.spark.util.{AsyncExecutorUtils, Global, WaitingIndexFactory} -import org.apache.hadoop.fs.Path import org.apache.spark.sql.test.util.QueryTest import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{DataFrame, SaveMode} +import org.apache.spark.sql.{DataFrame, Row, SaveMode} import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datastore.block.SegmentProperties -import org.apache.carbondata.core.datastore.page.ColumnPage -import org.apache.carbondata.core.exception.ConcurrentOperationException -import org.apache.carbondata.core.features.TableOperation -import org.apache.carbondata.core.index.dev.cgindex.{CoarseGrainIndex, CoarseGrainIndexFactory} -import org.apache.carbondata.core.index.dev.{IndexBuilder, IndexWriter} -import org.apache.carbondata.core.index.{IndexInputSplit, IndexMeta, Segment} -import org.apache.carbondata.core.indexstore.PartitionSpec -import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, IndexSchema} -import org.apache.carbondata.core.scan.filter.intf.ExpressionType -import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.events.Event - // This testsuite test insert and insert overwrite with other commands concurrently class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach { - private val executorService: ExecutorService = Executors.newFixedThreadPool(10) + var testData: DataFrame = _ override def beforeAll { dropTable() buildTestData() - + Review comment: donot keep blank line ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala ########## @@ -794,7 +795,31 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { sql("drop table if exists senten") } - test("block updating table which has index") { + // block update operation from table which has index. see PR2483 + test("update should fail if the table has index") { Review comment: Duplicate testcase. Please check the testcase of 'update operation is not supported for index' ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala ########## @@ -18,34 +18,18 @@ package org.apache.carbondata.spark.testsuite.iud import java.text.SimpleDateFormat -import java.util -import java.util.concurrent.{Callable, ExecutorService, Executors, Future} - -import scala.collection.JavaConverters._ - -import org.apache.hadoop.fs.Path -import org.apache.spark.sql.test.util.QueryTest -import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{DataFrame, SaveMode} -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datastore.block.SegmentProperties -import org.apache.carbondata.core.datastore.page.ColumnPage import org.apache.carbondata.core.exception.ConcurrentOperationException -import org.apache.carbondata.core.features.TableOperation -import org.apache.carbondata.core.index.dev.cgindex.{CoarseGrainIndex, CoarseGrainIndexFactory} -import org.apache.carbondata.core.index.dev.{IndexBuilder, IndexWriter} -import org.apache.carbondata.core.index.{IndexInputSplit, IndexMeta, Segment} -import org.apache.carbondata.core.indexstore.PartitionSpec -import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, IndexSchema} -import org.apache.carbondata.core.scan.filter.intf.ExpressionType import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.events.Event +import org.apache.spark.sql.test.util.QueryTest +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{DataFrame, Row, SaveMode} +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} Review comment: donot change the order of spark importing and carbondata importing ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Kejian-Li commented on a change in pull request #3947: URL: https://github.com/apache/carbondata/pull/3947#discussion_r494964396 ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala ########## @@ -18,34 +18,18 @@ package org.apache.carbondata.spark.testsuite.iud import java.text.SimpleDateFormat -import java.util -import java.util.concurrent.{Callable, ExecutorService, Executors, Future} - -import scala.collection.JavaConverters._ - -import org.apache.hadoop.fs.Path -import org.apache.spark.sql.test.util.QueryTest -import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{DataFrame, SaveMode} -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datastore.block.SegmentProperties -import org.apache.carbondata.core.datastore.page.ColumnPage import org.apache.carbondata.core.exception.ConcurrentOperationException -import org.apache.carbondata.core.features.TableOperation -import org.apache.carbondata.core.index.dev.cgindex.{CoarseGrainIndex, CoarseGrainIndexFactory} -import org.apache.carbondata.core.index.dev.{IndexBuilder, IndexWriter} -import org.apache.carbondata.core.index.{IndexInputSplit, IndexMeta, Segment} -import org.apache.carbondata.core.indexstore.PartitionSpec -import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, IndexSchema} -import org.apache.carbondata.core.scan.filter.intf.ExpressionType import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.events.Event +import org.apache.spark.sql.test.util.QueryTest +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{DataFrame, Row, SaveMode} +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} Review comment: done ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/AsyncExecutorUtils.scala ########## @@ -0,0 +1,200 @@ + + + +package org.apache.carbondata.spark.testsuite.iud Review comment: done ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala ########## @@ -65,7 +65,7 @@ private[sql] case class CarbonProjectForDeleteCommand( s"Unsupported delete operation on table containing mixed format segments") } - if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) { + if (SegmentStatusManager.isInsertOverwriteInProgress(carbonTable)) { throw new ConcurrentOperationException(carbonTable, "loading", "data delete") Review comment: done ########## File path: integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala ########## @@ -122,22 +139,23 @@ class TestCreateMVWithTimeSeries extends QueryTest with BeforeAndAfterAll { sql("drop materialized view if exists mv4") sql("drop materialized view if exists mv5") } + dropMVs sql( "create materialized view mv1 as " + - "select timeseries(projectjoindate,'second'), sum(projectcode) from maintable group by timeseries(projectjoindate,'second')") + "select timeseries(projectjoindate,'second'), sum(projectcode) from maintable group by timeseries(projectjoindate,'second')") sql( "create materialized view mv2 as " + - "select timeseries(projectjoindate,'hour'), sum(projectcode) from maintable group by timeseries(projectjoindate,'hour')") + "select timeseries(projectjoindate,'hour'), sum(projectcode) from maintable group by timeseries(projectjoindate,'hour')") Review comment: done ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala ########## @@ -170,17 +146,19 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterA } test("alter rename table should fail if insert overwrite is in progress") { - val future = runSqlAsync("insert overwrite table orders select * from orders_overwrite") + sql("drop table if exists other_orders") + val future = AsyncExecutorUtils.runSqlAsync("insert overwrite table orders select * from orders_overwrite") val ex = intercept[ConcurrentOperationException] { - sql("alter table orders rename to other") + sql("alter table orders rename to other_orders") Review comment: done ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala ########## @@ -92,54 +77,46 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterA private def createTable(tableName: String, schema: StructType): Unit = { val schemaString = schema.fields.map(x => x.name + " " + x.dataType.typeName).mkString(", ") sql(s"CREATE TABLE $tableName ($schemaString) stored as carbondata tblproperties" + - s"('sort_scope'='local_sort','sort_columns'='o_country,o_name,o_phonetype,o_serialname," + - s"o_comment')") - } - - override def afterAll { - executorService.shutdownNow() - dropTable() + s"('sort_scope'='local_sort','sort_columns'='o_country,o_name,o_phonetype,o_serialname," + Review comment: done ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala ########## @@ -68,8 +53,8 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterA .mode(SaveMode.Overwrite) .save() - sql(s"insert into orders select * from temp_table") - sql(s"insert into orders_overwrite select * from temp_table") + sql(s"insert into orders select * from temp_table") // load_0 success + sql(s"insert into orders_overwrite select * from temp_table") // load_0 success Review comment: done ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala ########## @@ -68,8 +53,8 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterA .mode(SaveMode.Overwrite) .save() - sql(s"insert into orders select * from temp_table") - sql(s"insert into orders_overwrite select * from temp_table") + sql(s"insert into orders select * from temp_table") // load_0 success Review comment: done ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala ########## @@ -18,21 +18,18 @@ package org.apache.carbondata.spark.testsuite.iud import java.io.File -import org.apache.spark.sql.hive.CarbonRelation -import org.apache.spark.sql.test.SparkTestQueryExecutor -import org.apache.spark.sql.test.util.QueryTest -import org.apache.spark.sql.{CarbonEnv, Row, SaveMode} -import org.scalatest.BeforeAndAfterAll - import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.index.Segment import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.mutate.CarbonUpdateUtil -import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.spark.sql.hive.CarbonRelation Review comment: done ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/UpdateTablePreEventListener.scala ########## @@ -49,7 +49,6 @@ class UpdateTablePreEventListener extends OperationEventListener with Logging { carbonTable .getDatabaseName }.${ carbonTable.getTableName }]. Drop all indexes and retry") Review comment: done ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/DeleteFromTableEventListener.scala ########## @@ -53,6 +53,12 @@ class DeleteFromTableEventListener extends OperationEventListener with Logging { carbonTable .getDatabaseName }.${ carbonTable.getTableName }]") + } else if (!carbonTable.getIndexesMap.isEmpty) { Review comment: I think that the original code has clearer error information ########## File path: core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java ########## @@ -918,17 +893,40 @@ public static Boolean isCompactionInProgress(CarbonTable carbonTable) { return compactionInProgress; } + /** + * Return true if insert or insert overwrite is in progress for specified table + */ + public static Boolean isInsertInProgress(CarbonTable carbonTable) { + if (carbonTable == null) { + return false; + } + boolean loadInProgress = false; + String metaPath = carbonTable.getMetadataPath(); + LoadMetadataDetails[] listOfLoadFolderDetailsArray = SegmentStatusManager.readLoadMetadata(metaPath); + if (listOfLoadFolderDetailsArray.length != 0) { + for (LoadMetadataDetails loadDetail :listOfLoadFolderDetailsArray) { + SegmentStatus segmentStatus = loadDetail.getSegmentStatus(); + if (segmentStatus == SegmentStatus.INSERT_IN_PROGRESS + || segmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) { + loadInProgress = + isLoadInProgress(carbonTable.getAbsoluteTableIdentifier(), + loadDetail.getLoadName()); + } + } + } + return loadInProgress; + } + /** * Return true if insert overwrite is in progress for specified table */ - public static Boolean isOverwriteInProgressInTable(CarbonTable carbonTable) { + public static boolean isInsertOverwriteInProgress(CarbonTable carbonTable) { if (carbonTable == null) { return false; } boolean loadInProgress = false; String metaPath = carbonTable.getMetadataPath(); - LoadMetadataDetails[] listOfLoadFolderDetailsArray = - SegmentStatusManager.readLoadMetadata(metaPath); + LoadMetadataDetails[] listOfLoadFolderDetailsArray = SegmentStatusManager.readLoadMetadata(metaPath); Review comment: done ########## File path: core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java ########## @@ -918,17 +893,40 @@ public static Boolean isCompactionInProgress(CarbonTable carbonTable) { return compactionInProgress; } + /** + * Return true if insert or insert overwrite is in progress for specified table + */ + public static Boolean isInsertInProgress(CarbonTable carbonTable) { Review comment: done ########## File path: core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatus.java ########## @@ -31,6 +31,12 @@ @SerializedName("Success") SUCCESS("Success"), + /** Review comment: done ########## File path: integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala ########## @@ -33,17 +34,30 @@ class TestCreateMVWithTimeSeries extends QueryTest with BeforeAndAfterAll { override def beforeAll(): Unit = { CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") - drop() + dropTable() sql("CREATE TABLE maintable (empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " + - "deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, utilization int,salary int) STORED AS carbondata") + "deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, utilization int,salary int) STORED AS carbondata") + sql(s""" + | CREATE INDEX maintable_index_test + | ON TABLE maintable (designation) + | AS '${classOf[WaitingIndexFactory].getName}' + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE maintable OPTIONS |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) + + sql("CREATE TABLE temp_maintable (empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " + Review comment: this is temp_maintable here, this temporary table is used to insert into maintable in order to make "insert into or insert overwrite" operation in the following. ########## File path: integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala ########## @@ -33,17 +34,30 @@ class TestCreateMVWithTimeSeries extends QueryTest with BeforeAndAfterAll { override def beforeAll(): Unit = { CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") - drop() + dropTable() sql("CREATE TABLE maintable (empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " + - "deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, utilization int,salary int) STORED AS carbondata") + "deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, utilization int,salary int) STORED AS carbondata") Review comment: copy that ########## File path: integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala ########## @@ -167,18 +182,24 @@ class TestCreateMVWithTimeSeries extends QueryTest with BeforeAndAfterAll { test("insert and create materialized view in progress") { sql("drop materialized view if exists mv1") - val query = s"LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE maintable " + - s"OPTIONS('DELIMITER'= ',')" - val executorService = Executors.newFixedThreadPool(4) - executorService.submit(new QueryTask(query)) - intercept[UnsupportedOperationException] { - sql( - "create materialized view mv1 as " + + + val future = AsyncExecutorUtils.runSqlAsync("insert overwrite table maintable select * from temp_maintable") + val ex = intercept[UnsupportedOperationException] { + sql("create materialized view mv1 as " + "select timeseries(projectjoindate,'year'), sum(projectcode) from maintable group by timeseries(projectjoindate,'year')") - }.getMessage - .contains("Cannot create mv materialized view table when insert is in progress on parent table: maintable") - executorService.shutdown() - executorService.awaitTermination(2, TimeUnit.HOURS) + } + assert(future.get.contains("PASS")) + assert(ex.getMessage.contains("Cannot create mv when insert overwrite is in progress on table default_maintable")) + sql("drop materialized view if exists mv1") + } + + test("create materialized view should success when parent table is insert in progress") { + sql("drop materialized view if exists mv1") + + val future = AsyncExecutorUtils.runSqlAsync("insert into table maintable select * from temp_maintable") + sql("create materialized view mv1 as " + Review comment: roger that ########## File path: integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala ########## @@ -33,17 +34,30 @@ class TestCreateMVWithTimeSeries extends QueryTest with BeforeAndAfterAll { override def beforeAll(): Unit = { CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") - drop() + dropTable() sql("CREATE TABLE maintable (empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " + - "deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, utilization int,salary int) STORED AS carbondata") + "deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, utilization int,salary int) STORED AS carbondata") Review comment: copy that ########## File path: integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala ########## @@ -167,18 +182,24 @@ class TestCreateMVWithTimeSeries extends QueryTest with BeforeAndAfterAll { test("insert and create materialized view in progress") { sql("drop materialized view if exists mv1") - val query = s"LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE maintable " + - s"OPTIONS('DELIMITER'= ',')" - val executorService = Executors.newFixedThreadPool(4) - executorService.submit(new QueryTask(query)) - intercept[UnsupportedOperationException] { - sql( - "create materialized view mv1 as " + + + val future = AsyncExecutorUtils.runSqlAsync("insert overwrite table maintable select * from temp_maintable") + val ex = intercept[UnsupportedOperationException] { + sql("create materialized view mv1 as " + "select timeseries(projectjoindate,'year'), sum(projectcode) from maintable group by timeseries(projectjoindate,'year')") - }.getMessage - .contains("Cannot create mv materialized view table when insert is in progress on parent table: maintable") - executorService.shutdown() - executorService.awaitTermination(2, TimeUnit.HOURS) + } + assert(future.get.contains("PASS")) + assert(ex.getMessage.contains("Cannot create mv when insert overwrite is in progress on table default_maintable")) + sql("drop materialized view if exists mv1") + } + + test("create materialized view should success when parent table is insert in progress") { + sql("drop materialized view if exists mv1") + + val future = AsyncExecutorUtils.runSqlAsync("insert into table maintable select * from temp_maintable") + sql("create materialized view mv1 as " + Review comment: roger that ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Kejian-Li closed pull request #3947: URL: https://github.com/apache/carbondata/pull/3947 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3947: URL: https://github.com/apache/carbondata/pull/3947#issuecomment-698817357 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3947: URL: https://github.com/apache/carbondata/pull/3947#issuecomment-698958049 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2479/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3947: URL: https://github.com/apache/carbondata/pull/3947#issuecomment-698959902 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4223/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
Free forum by Nabble | Edit this page |