ravipesala commented on a change in pull request #3177: [CARBONDATA-3337][CARBONDATA-3306] Distributed index server
URL: https://github.com/apache/carbondata/pull/3177#discussion_r279622309 ########## File path: core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java ########## @@ -16,62 +16,115 @@ */ package org.apache.carbondata.core.datamap; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.io.Serializable; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; -import java.util.Collections; import java.util.Iterator; import java.util.List; +import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.datamap.dev.DataMap; import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper; import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder; +import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope; +import org.apache.carbondata.core.readcommitter.ReadCommittedScope; +import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; +import org.apache.carbondata.core.util.ObjectSerializationUtil; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.log4j.Logger; /** * Input format for datamaps, it makes the datamap pruning distributable. */ -public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBlocklet> implements - Serializable { +public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBlocklet> + implements Serializable, Writable { + + private static final transient Logger LOGGER = + LogServiceFactory.getLogService(DistributableDataMapFormat.class.getName()); + + private static final long serialVersionUID = 9189779090091151248L; private CarbonTable table; + private FilterResolverIntf filterResolverIntf; + private DataMapExprWrapper dataMapExprWrapper; private List<Segment> validSegments; - private List<Segment> invalidSegments; + private List<String> invalidSegments; private List<PartitionSpec> partitions; - private DataMapDistributableWrapper distributable; - private boolean isJobToClearDataMaps = false; - DistributableDataMapFormat(CarbonTable table, DataMapExprWrapper dataMapExprWrapper, - List<Segment> validSegments, List<Segment> invalidSegments, List<PartitionSpec> partitions, - boolean isJobToClearDataMaps) { + private ReadCommittedScope readCommittedScope; + + private DataMapLevel dataMapLevel; + + private boolean isFallbackJob = false; + + DistributableDataMapFormat() { + + } + + DistributableDataMapFormat(CarbonTable table, + List<Segment> validSegments, List<String> invalidSegments, boolean isJobToClearDataMaps, + DataMapLevel dataMapLevel) throws IOException { + this(table, null, validSegments, invalidSegments, null, + isJobToClearDataMaps, dataMapLevel, false); + } + + DistributableDataMapFormat(CarbonTable table, FilterResolverIntf filterResolverIntf, + List<Segment> validSegments, List<String> invalidSegments, List<PartitionSpec> partitions, + boolean isJobToClearDataMaps, DataMapLevel dataMapLevel, boolean isFallbackJob) + throws IOException { this.table = table; - this.dataMapExprWrapper = dataMapExprWrapper; + this.filterResolverIntf = filterResolverIntf; this.validSegments = validSegments; this.invalidSegments = invalidSegments; this.partitions = partitions; + if (!validSegments.isEmpty()) { + this.readCommittedScope = validSegments.get(0).getReadCommittedScope(); + } this.isJobToClearDataMaps = isJobToClearDataMaps; + this.dataMapLevel = dataMapLevel; + this.isFallbackJob = isFallbackJob; + generateDataMapExpr(); + } + + private void generateDataMapExpr() throws IOException { + if (dataMapLevel == null) { + this.dataMapExprWrapper = DataMapChooser.getDefaultDataMap(table, filterResolverIntf); Review comment: I think there is no need to create dataMapExprWrapper here and serialize it. Better create in the executor side. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
Free forum by Nabble | Edit this page |