[GitHub] [carbondata] jackylk commented on a change in pull request #3415: [CARBONDATA-3553] Support SDK Writer using existing schema file

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3415: [CARBONDATA-3553] Support SDK Writer using existing schema file

GitBox
jackylk commented on a change in pull request #3415: [CARBONDATA-3553] Support SDK Writer using existing schema file
URL: https://github.com/apache/carbondata/pull/3415#discussion_r336951907
 
 

 ##########
 File path: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sdk/TestSDKWithTransactionalTable.scala
 ##########
 @@ -0,0 +1,85 @@
+package org.apache.carbondata.spark.testsuite.sdk
+
+import java.io.{BufferedWriter, File, FileWriter}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.sdk.file.CarbonReader
+import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestSDKWithTransactionalTable extends QueryTest with BeforeAndAfterAll {
+  var filePath: String = _
+
+  def buildTestData() =  {
+    filePath = s"${integrationPath}/spark-common-test/target/big.csv"
+    val file = new File(filePath)
+    val writer = new BufferedWriter(new FileWriter(file))
+    writer.write("c1, c2, c3, c4, c5, c6, c7, c8, c9, c10")
+    writer.newLine()
+    for (i <- 0 until 10) {
+      writer.write("a" + 1%1000 + "," +
+                   "b" + 1%1000 + "," +
+                   "c" + 1%1000 + "," +
+                   "d" + 1%1000 + "," +
+                   "e" + 1%1000 + "," +
+                   "f" + 1%1000 + "," +
+                   1%1000 + "," +
+                   1%1000 + "," +
+                   1%1000 + "," +
+                   1%1000 + "\n")
+      if ( i % 10000 == 0) {
+        writer.flush()
+      }
+    }
+    writer.close()
+  }
+
+  def dropTable() = {
+    sql("DROP TABLE IF EXISTS carbon_load1")
+    sql("DROP TABLE IF EXISTS train")
+    sql("DROP TABLE IF EXISTS test")
+  }
+
+  override def beforeAll {
+    dropTable
+    buildTestData
+  }
+
+  test("test sdk with transactional table, read as row") {
+
+    sql(
+      """
+        | CREATE TABLE carbon_load1(
+        |    c1 string, c2 string, c3 string, c4 string, c5 string,
+        |    c6 string, c7 int, c8 int, c9 int, c10 int)
+        | STORED AS carbondata
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1")
+
+    val table = CarbonEnv.getCarbonTable(None, "carbon_load1")(sqlContext.sparkSession)
+    var reader = CarbonReader.builder(table.getTablePath, table.getTableName).build()
+
+    var count = 0
+    while ( { reader.hasNext }) {
+      var row = reader.readNextRow.asInstanceOf[Array[AnyRef]]
+      count += 1
+    }
+    reader.close()
+
+    checkAnswer(sql("select count(*) from carbon_load1"), Seq(Row(count)))
+    sql("DROP TABLE carbon_load1")
+  }
+
+  override def afterAll {
+    new File(filePath).delete()
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+        CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
+  }
+
+}
 
 Review comment:
   Since now writer supports transactioanl table also, can you add a testcase to use SDK to write and use ADD SEGMENT sql statement to add it and then query it?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services