[GitHub] carbondata pull request #2646: [WIP] Support SDK writer as thread safe api

classic Classic list List threaded Threaded
31 messages Options
12
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2646: [CARBONDATA-2874] Support SDK writer as thread safe ...

qiuchenjian-2
Github user ajantha-bhat commented on the issue:

    https://github.com/apache/carbondata/pull/2646
 
    retest SDV please


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2646: [CARBONDATA-2874] Support SDK writer as thread safe ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2646
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6335/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2646: [CARBONDATA-2874] Support SDK writer as threa...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2646#discussion_r212350373
 
    --- Diff: store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.sdk.file;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.carbondata.core.metadata.datatype.DataTypes;
    +
    +import org.apache.commons.io.FileUtils;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +/**
    + * multi-thread Test suite for {@link CSVCarbonWriter}
    + */
    +public class ConcurrentSdkWriterTest {
    +
    +  @Test public void testWriteFiles() throws IOException {
    +    String path = "./testWriteFiles";
    +    FileUtils.deleteDirectory(new File(path));
    +
    +    Field[] fields = new Field[2];
    +    fields[0] = new Field("name", DataTypes.STRING);
    +    fields[1] = new Field("age", DataTypes.INT);
    +
    +    int poolSize = 6;
    +    ExecutorService executorService = Executors.newFixedThreadPool(poolSize);
    +    try {
    +      CarbonWriterBuilder builder =
    +          CarbonWriter.builder().outputPath(path).uniqueIdentifier(555).taskNo(123);
    +      CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields));
    +      // write in multi-thread
    +      for (int i = 0; i < poolSize; i++) {
    +        executorService.submit(new writeLogic(writer));
    --- End diff --
   
    writeLogic should be WriteLogic


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2646: [CARBONDATA-2874] Support SDK writer as threa...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2646#discussion_r212350482
 
    --- Diff: store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.sdk.file;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.carbondata.core.metadata.datatype.DataTypes;
    +
    +import org.apache.commons.io.FileUtils;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +/**
    + * multi-thread Test suite for {@link CSVCarbonWriter}
    + */
    +public class ConcurrentSdkWriterTest {
    +
    +  @Test public void testWriteFiles() throws IOException {
    +    String path = "./testWriteFiles";
    +    FileUtils.deleteDirectory(new File(path));
    +
    +    Field[] fields = new Field[2];
    +    fields[0] = new Field("name", DataTypes.STRING);
    +    fields[1] = new Field("age", DataTypes.INT);
    +
    +    int poolSize = 6;
    +    ExecutorService executorService = Executors.newFixedThreadPool(poolSize);
    +    try {
    +      CarbonWriterBuilder builder =
    +          CarbonWriter.builder().outputPath(path).uniqueIdentifier(555).taskNo(123);
    +      CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields));
    +      // write in multi-thread
    +      for (int i = 0; i < poolSize; i++) {
    +        executorService.submit(new writeLogic(writer));
    +      }
    +      executorService.shutdown();
    +      executorService.awaitTermination(2, TimeUnit.HOURS);
    +      writer.close();
    +    } catch (Exception e) {
    +      e.printStackTrace();
    --- End diff --
   
    should fail the test


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2646: [CARBONDATA-2874] Support SDK writer as threa...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2646#discussion_r212350574
 
    --- Diff: store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.sdk.file;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.carbondata.core.metadata.datatype.DataTypes;
    +
    +import org.apache.commons.io.FileUtils;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +/**
    + * multi-thread Test suite for {@link CSVCarbonWriter}
    + */
    +public class ConcurrentSdkWriterTest {
    +
    +  @Test public void testWriteFiles() throws IOException {
    +    String path = "./testWriteFiles";
    +    FileUtils.deleteDirectory(new File(path));
    +
    +    Field[] fields = new Field[2];
    +    fields[0] = new Field("name", DataTypes.STRING);
    +    fields[1] = new Field("age", DataTypes.INT);
    +
    +    int poolSize = 6;
    +    ExecutorService executorService = Executors.newFixedThreadPool(poolSize);
    +    try {
    +      CarbonWriterBuilder builder =
    +          CarbonWriter.builder().outputPath(path).uniqueIdentifier(555).taskNo(123);
    +      CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields));
    +      // write in multi-thread
    +      for (int i = 0; i < poolSize; i++) {
    +        executorService.submit(new writeLogic(writer));
    +      }
    +      executorService.shutdown();
    +      executorService.awaitTermination(2, TimeUnit.HOURS);
    +      writer.close();
    +    } catch (Exception e) {
    +      e.printStackTrace();
    +    }
    +
    +    // read the files and verify the count
    +    CarbonReader reader;
    +    try {
    +      reader =
    +          CarbonReader.builder(path, "_temp").projection(new String[] { "name", "age" }).build();
    +      int i = 0;
    +      while (reader.hasNext()) {
    +        Object[] row = (Object[]) reader.readNextRow();
    +        i++;
    +      }
    +      // count should be 60 records
    +      Assert.assertEquals(i, 60);
    +      reader.close();
    +    } catch (InterruptedException e) {
    +      e.printStackTrace();
    +    }
    +
    +    FileUtils.deleteDirectory(new File(path));
    +  }
    +
    +  class writeLogic implements Runnable {
    +    CarbonWriter writer;
    +
    +    writeLogic(CarbonWriter writer) {
    +      this.writer = writer;
    +    }
    +
    +    @Override public void run() {
    +
    +      try {
    +        for (int i = 0; i < 10; i++) {
    +          writer.write(new String[] { "robot" + (i % 10), String.valueOf(i),
    +              String.valueOf((double) i / 2) });
    +        }
    +      } catch (IOException e) {
    +        e.printStackTrace();
    --- End diff --
   
    should fail the test


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2646: [CARBONDATA-2874] Support SDK writer as threa...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2646#discussion_r212353990
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java ---
    @@ -45,17 +45,21 @@
     
       private RowBatch readBatch;
     
    +  private static Object lockObject = new Object();
    +
       private ArrayBlockingQueue<RowBatch> queue = new ArrayBlockingQueue<>(10);
     
       public void write(Object[] row) throws InterruptedException {
         if (close) {
           // already might be closed forcefully
           return;
         }
    -    if (!loadBatch.addRow(row)) {
    -      loadBatch.readyRead();
    -      queue.put(loadBatch);
    -      loadBatch = new RowBatch(batchSize);
    +    synchronized (lockObject) {
    --- End diff --
   
    I feel that the loadBatch should be the parameter of this function, so the caller can accumulate the rows in a batch in its own thread, then call this write function to add to the shared queue.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2646: [CARBONDATA-2874] Support SDK writer as thread safe ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2646
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6372/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2646: [CARBONDATA-2874] Support SDK writer as thread safe ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2646
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/8025/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2646: [CARBONDATA-2874] Support SDK writer as thread safe ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2646
 
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6749/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2646: [CARBONDATA-2874] Support SDK writer as threa...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ajantha-bhat closed the pull request at:

    https://github.com/apache/carbondata/pull/2646


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2646: [CARBONDATA-2874] Support SDK writer as thread safe ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on the issue:

    https://github.com/apache/carbondata/pull/2646
 
    closing this. As we handle this in different way in PR #2653


---
12