GitHub user akashrn5 opened a pull request:
https://github.com/apache/carbondata/pull/2310 [WIP]refactor distributable code and lauch job to clear the segmentmap and cache from executor Be sure to do all of the following checklist to help us incorporate your contribution quickly and easily: - [ ] Any interfaces changed? - [ ] Any backward compatibility impacted? - [ ] Document update required? - [ ] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? - How it is tested? Please attach test report. - Is it a performance related change? Please attach the performance test report. - Any additional information to help reviewers in testing this change. - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/akashrn5/incubator-carbondata refactor_clear_datamaps Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/2310.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2310 ---- commit 51e80e76bf24d579e4af0ebdfcbacd2e17bfc11d Author: akashrn5 <akashnilugal@...> Date: 2018-05-11T11:27:46Z refactor distributable code and lauch job to clear the segmentmap and cache from executor ---- --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2310 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5898/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2310 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4744/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2310 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4932/ --- |
In reply to this post by qiuchenjian-2
Github user akashrn5 commented on the issue:
https://github.com/apache/carbondata/pull/2310 retest this please --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2310 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5907/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2310 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4754/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2310#discussion_r188843256 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java --- @@ -130,6 +130,25 @@ private DataMapExprWrapper chooseDataMap(DataMapLevel level, FilterResolverIntf return null; } + /** + * Get all datamaps of the table for clearing purpose + */ + public DataMapExprWrapper getAllDataMapsForClear(CarbonTable carbonTable) + throws IOException { + List<TableDataMap> allDataMapFG = + DataMapStoreManager.getInstance().getAllVisibleDataMap(carbonTable); --- End diff -- Not just visible, you should get all datamaps --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2310#discussion_r188846192 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java --- @@ -364,17 +368,54 @@ public void clearInvalidSegments(CarbonTable carbonTable, List<Segment> segments * @param identifier Table identifier */ public void clearDataMaps(AbsoluteTableIdentifier identifier) { + CarbonTable carbonTable = getCarbonTable(identifier); String tableUniqueName = identifier.getCarbonTableIdentifier().getTableUniqueName(); List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName); + if (null != carbonTable && tableIndices != null) { + try { + DataMapUtil.executeDataMapJobForClearingDataMaps(carbonTable); + } catch (IOException e) { + LOGGER.error(e, "clear dataMap job failed"); + // ignoring the exception + } + } segmentRefreshMap.remove(identifier.uniqueName()); + clearDataMaps(tableUniqueName); + allDataMaps.remove(tableUniqueName); + } + + /** + * This method returns the carbonTable from identifier + * @param identifier + * @return + */ + public CarbonTable getCarbonTable(AbsoluteTableIdentifier identifier) { + CarbonTable carbonTable = null; + try { + carbonTable = CarbonTable --- End diff -- First try getting the table from cache using `CarbonMetadata.getInstance.getCarbonTable(dbName, tableName)` , if cannot get then read from disk --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2310#discussion_r188846433 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datamap; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; +import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.ObjectSerializationUtil; + +import org.apache.hadoop.conf.Configuration; + +public class DataMapUtil { + + private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr"; + + private static final LogService LOGGER = + LogServiceFactory.getLogService(DataMapUtil.class.getName()); + + /** + * This method set DataMapJob if configured + * + * @param conf + * @throws IOException + */ + public static void setDataMapJobIfConfigured(Configuration conf) throws IOException { + String className = "org.apache.carbondata.spark.rdd.SparkDataMapJob"; + setDataMapJob(conf, createDataMapJob(className)); + } + + /** + * Creates instance for the DataMap Job class + * + * @param className + * @return + */ + public static Object createDataMapJob(String className) { + try { + return Class.forName(className).getDeclaredConstructors()[0].newInstance(); + } catch (Exception e) { + LOGGER.error(e); + return null; + } + } + + public static void setDataMapJob(Configuration configuration, Object dataMapJob) + throws IOException { + if (dataMapJob != null) { + String toString = ObjectSerializationUtil.convertObjectToString(dataMapJob); + configuration.set(DATA_MAP_DSTR, toString); + } + } + + public static DataMapJob getDataMapJob(Configuration configuration) throws IOException { + String jobString = configuration.get(DATA_MAP_DSTR); + if (jobString != null) { + return (DataMapJob) ObjectSerializationUtil.convertStringToObject(jobString); + } + return null; + } + + public static void executeDataMapJobForClearingDataMaps(CarbonTable carbonTable) + throws IOException { + String dataMapJobClassName = "org.apache.carbondata.spark.rdd.SparkDataMapJob"; + DataMapJob dataMapJob = (DataMapJob) createDataMapJob(dataMapJobClassName); + String className = "org.apache.carbondata.core.datamap.DistributableDataMapFormat"; + SegmentStatusManager ssm = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier()); + List<Segment> validSegments = ssm.getValidAndInvalidSegments().getValidSegments(); + DataMapExprWrapper dataMapExprWrapper = null; + if (DataMapStoreManager.getInstance().getAllDataMap(carbonTable).size() > 0) { + DataMapChooser dataMapChooser = new DataMapChooser(carbonTable); + dataMapExprWrapper = dataMapChooser.getAllDataMapsForClear(carbonTable); + } else { + return; + } + DistributableDataMapFormat dataMapFormat = + createDataMapJob(carbonTable, dataMapExprWrapper, validSegments, null, className, true); + dataMapJob.execute((DistributableDataMapFormat) dataMapFormat, null); --- End diff -- No need to typecast --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2310#discussion_r188846851 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datamap; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; +import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.ObjectSerializationUtil; + +import org.apache.hadoop.conf.Configuration; + +public class DataMapUtil { + + private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr"; + + private static final LogService LOGGER = + LogServiceFactory.getLogService(DataMapUtil.class.getName()); + + /** + * This method set DataMapJob if configured + * + * @param conf + * @throws IOException + */ + public static void setDataMapJobIfConfigured(Configuration conf) throws IOException { + String className = "org.apache.carbondata.spark.rdd.SparkDataMapJob"; + setDataMapJob(conf, createDataMapJob(className)); + } + + /** + * Creates instance for the DataMap Job class + * + * @param className + * @return + */ + public static Object createDataMapJob(String className) { + try { + return Class.forName(className).getDeclaredConstructors()[0].newInstance(); + } catch (Exception e) { + LOGGER.error(e); + return null; + } + } + + public static void setDataMapJob(Configuration configuration, Object dataMapJob) + throws IOException { + if (dataMapJob != null) { + String toString = ObjectSerializationUtil.convertObjectToString(dataMapJob); + configuration.set(DATA_MAP_DSTR, toString); + } + } + + public static DataMapJob getDataMapJob(Configuration configuration) throws IOException { + String jobString = configuration.get(DATA_MAP_DSTR); + if (jobString != null) { + return (DataMapJob) ObjectSerializationUtil.convertStringToObject(jobString); + } + return null; + } + + public static void executeDataMapJobForClearingDataMaps(CarbonTable carbonTable) + throws IOException { + String dataMapJobClassName = "org.apache.carbondata.spark.rdd.SparkDataMapJob"; + DataMapJob dataMapJob = (DataMapJob) createDataMapJob(dataMapJobClassName); + String className = "org.apache.carbondata.core.datamap.DistributableDataMapFormat"; + SegmentStatusManager ssm = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier()); + List<Segment> validSegments = ssm.getValidAndInvalidSegments().getValidSegments(); + DataMapExprWrapper dataMapExprWrapper = null; + if (DataMapStoreManager.getInstance().getAllDataMap(carbonTable).size() > 0) { + DataMapChooser dataMapChooser = new DataMapChooser(carbonTable); + dataMapExprWrapper = dataMapChooser.getAllDataMapsForClear(carbonTable); + } else { + return; + } + DistributableDataMapFormat dataMapFormat = --- End diff -- What if `dataMapExprWrapper` is null? --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2310#discussion_r188847010 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datamap; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; +import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.ObjectSerializationUtil; + +import org.apache.hadoop.conf.Configuration; + +public class DataMapUtil { + + private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr"; + + private static final LogService LOGGER = + LogServiceFactory.getLogService(DataMapUtil.class.getName()); + + /** + * This method set DataMapJob if configured + * + * @param conf + * @throws IOException + */ + public static void setDataMapJobIfConfigured(Configuration conf) throws IOException { + String className = "org.apache.carbondata.spark.rdd.SparkDataMapJob"; + setDataMapJob(conf, createDataMapJob(className)); + } + + /** + * Creates instance for the DataMap Job class + * + * @param className + * @return + */ + public static Object createDataMapJob(String className) { + try { + return Class.forName(className).getDeclaredConstructors()[0].newInstance(); + } catch (Exception e) { + LOGGER.error(e); + return null; + } + } + + public static void setDataMapJob(Configuration configuration, Object dataMapJob) --- End diff -- Add comments to all public methods --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2310#discussion_r188847107 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datamap; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; +import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.ObjectSerializationUtil; + +import org.apache.hadoop.conf.Configuration; + +public class DataMapUtil { + + private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr"; + + private static final LogService LOGGER = + LogServiceFactory.getLogService(DataMapUtil.class.getName()); + + /** + * This method set DataMapJob if configured + * + * @param conf + * @throws IOException + */ + public static void setDataMapJobIfConfigured(Configuration conf) throws IOException { + String className = "org.apache.carbondata.spark.rdd.SparkDataMapJob"; + setDataMapJob(conf, createDataMapJob(className)); + } + + /** + * Creates instance for the DataMap Job class + * + * @param className + * @return + */ + public static Object createDataMapJob(String className) { + try { + return Class.forName(className).getDeclaredConstructors()[0].newInstance(); + } catch (Exception e) { + LOGGER.error(e); + return null; + } + } + + public static void setDataMapJob(Configuration configuration, Object dataMapJob) + throws IOException { + if (dataMapJob != null) { + String toString = ObjectSerializationUtil.convertObjectToString(dataMapJob); + configuration.set(DATA_MAP_DSTR, toString); + } + } + + public static DataMapJob getDataMapJob(Configuration configuration) throws IOException { + String jobString = configuration.get(DATA_MAP_DSTR); + if (jobString != null) { + return (DataMapJob) ObjectSerializationUtil.convertStringToObject(jobString); + } + return null; + } + + public static void executeDataMapJobForClearingDataMaps(CarbonTable carbonTable) + throws IOException { + String dataMapJobClassName = "org.apache.carbondata.spark.rdd.SparkDataMapJob"; + DataMapJob dataMapJob = (DataMapJob) createDataMapJob(dataMapJobClassName); + String className = "org.apache.carbondata.core.datamap.DistributableDataMapFormat"; + SegmentStatusManager ssm = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier()); + List<Segment> validSegments = ssm.getValidAndInvalidSegments().getValidSegments(); + DataMapExprWrapper dataMapExprWrapper = null; + if (DataMapStoreManager.getInstance().getAllDataMap(carbonTable).size() > 0) { + DataMapChooser dataMapChooser = new DataMapChooser(carbonTable); + dataMapExprWrapper = dataMapChooser.getAllDataMapsForClear(carbonTable); + } else { + return; + } + DistributableDataMapFormat dataMapFormat = + createDataMapJob(carbonTable, dataMapExprWrapper, validSegments, null, className, true); + dataMapJob.execute((DistributableDataMapFormat) dataMapFormat, null); + } + + public static DistributableDataMapFormat createDataMapJob(CarbonTable carbonTable, --- End diff -- Make it private --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2310#discussion_r188847158 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datamap; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; +import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.ObjectSerializationUtil; + +import org.apache.hadoop.conf.Configuration; + +public class DataMapUtil { + + private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr"; + + private static final LogService LOGGER = + LogServiceFactory.getLogService(DataMapUtil.class.getName()); + + /** + * This method set DataMapJob if configured + * + * @param conf + * @throws IOException + */ + public static void setDataMapJobIfConfigured(Configuration conf) throws IOException { + String className = "org.apache.carbondata.spark.rdd.SparkDataMapJob"; + setDataMapJob(conf, createDataMapJob(className)); + } + + /** + * Creates instance for the DataMap Job class + * + * @param className + * @return + */ + public static Object createDataMapJob(String className) { --- End diff -- Make it private --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2310#discussion_r188847466 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java --- @@ -58,14 +57,23 @@ private List<PartitionSpec> partitions; + private DataMapDistributableWrapper distributable; + + private boolean isJobToClearDataMaps = false; + DistributableDataMapFormat(CarbonTable table, DataMapExprWrapper dataMapExprWrapper, List<Segment> validSegments, - List<PartitionSpec> partitions, String className) { --- End diff -- Remove the `className`, it seems not used --- |
In reply to this post by qiuchenjian-2
Github user akashrn5 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2310#discussion_r188847767 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datamap; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; +import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.ObjectSerializationUtil; + +import org.apache.hadoop.conf.Configuration; + +public class DataMapUtil { + + private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr"; + + private static final LogService LOGGER = + LogServiceFactory.getLogService(DataMapUtil.class.getName()); + + /** + * This method set DataMapJob if configured + * + * @param conf + * @throws IOException + */ + public static void setDataMapJobIfConfigured(Configuration conf) throws IOException { + String className = "org.apache.carbondata.spark.rdd.SparkDataMapJob"; + setDataMapJob(conf, createDataMapJob(className)); + } + + /** + * Creates instance for the DataMap Job class + * + * @param className + * @return + */ + public static Object createDataMapJob(String className) { + try { + return Class.forName(className).getDeclaredConstructors()[0].newInstance(); + } catch (Exception e) { + LOGGER.error(e); + return null; + } + } + + public static void setDataMapJob(Configuration configuration, Object dataMapJob) + throws IOException { + if (dataMapJob != null) { + String toString = ObjectSerializationUtil.convertObjectToString(dataMapJob); + configuration.set(DATA_MAP_DSTR, toString); + } + } + + public static DataMapJob getDataMapJob(Configuration configuration) throws IOException { + String jobString = configuration.get(DATA_MAP_DSTR); + if (jobString != null) { + return (DataMapJob) ObjectSerializationUtil.convertStringToObject(jobString); + } + return null; + } + + public static void executeDataMapJobForClearingDataMaps(CarbonTable carbonTable) + throws IOException { + String dataMapJobClassName = "org.apache.carbondata.spark.rdd.SparkDataMapJob"; + DataMapJob dataMapJob = (DataMapJob) createDataMapJob(dataMapJobClassName); + String className = "org.apache.carbondata.core.datamap.DistributableDataMapFormat"; + SegmentStatusManager ssm = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier()); + List<Segment> validSegments = ssm.getValidAndInvalidSegments().getValidSegments(); + DataMapExprWrapper dataMapExprWrapper = null; + if (DataMapStoreManager.getInstance().getAllDataMap(carbonTable).size() > 0) { + DataMapChooser dataMapChooser = new DataMapChooser(carbonTable); + dataMapExprWrapper = dataMapChooser.getAllDataMapsForClear(carbonTable); + } else { + return; + } + DistributableDataMapFormat dataMapFormat = --- End diff -- dataMapExprWrapper will be null, if the table does not have datamaps, that check is already there --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2310#discussion_r188847798 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java --- @@ -103,10 +111,23 @@ private static FilterResolverIntf getFilterExp(Configuration configuration) thro @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { - DataMapDistributableWrapper distributable = (DataMapDistributableWrapper) inputSplit; - TableDataMap dataMap = DataMapStoreManager.getInstance() + distributable = (DataMapDistributableWrapper) inputSplit; + // clear the segmentMap and from cache in executor when there are invalid segments + SegmentStatusManager ssm = new SegmentStatusManager(table.getAbsoluteTableIdentifier()); --- End diff -- Don't read table statusfile in executor, pass the invalid segments from driver --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2310#discussion_r188848603 --- Diff: datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java --- @@ -162,14 +162,16 @@ public DataMapBuilder createBuilder(Segment segment, String shardName) { * Get all distributable objects of a segmentid */ @Override - public List<DataMapDistributable> toDistributable(Segment segment) { + public List<DataMapDistributable> toDistributable(Segment segment, boolean isJobToClearDataMaps) { --- End diff -- Don't change datamap interface, if the `segment.getFilteredIndexShardNames()` is null then get all. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2310#discussion_r188848658 --- Diff: datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java --- @@ -123,7 +123,8 @@ public DataMapBuilder createBuilder(Segment segment, String shardName) * @param segment * @return */ - @Override public List<DataMapDistributable> toDistributable(Segment segment) { + @Override public List<DataMapDistributable> toDistributable(Segment segment, --- End diff -- Don't change interface --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2310#discussion_r188849156 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datamap; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; +import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.ObjectSerializationUtil; + +import org.apache.hadoop.conf.Configuration; + +public class DataMapUtil { + + private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr"; + + private static final LogService LOGGER = + LogServiceFactory.getLogService(DataMapUtil.class.getName()); + + /** + * This method set DataMapJob if configured + * + * @param conf + * @throws IOException + */ + public static void setDataMapJobIfConfigured(Configuration conf) throws IOException { --- End diff -- I don't think this method is required here,let it be there in inputformat --- |
Free forum by Nabble | Edit this page |