[ https://issues.apache.org/jira/browse/CARBONDATA-3926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yutao updated CARBONDATA-3926: ------------------------------ Description: [https://github.com/apache/carbondata/blob/master/docs/flink-integration-guide.md] i work with this ,use spark sql create carbondata table and i can see -rw-r--r-- 3 hadoop dc_cbss 2650 2020-07-25 21:06 hdfs://beh/user/dc_cbss/warehouse/testyu.db/userpolicy/Metadata/schema then i write flink app and run with yarn; it work i can see carbonfile in my code defined directory ; val dataTempPath = "hdfs://beh/user/dc_cbss/temp/" [dc_cbss@hive_client_004 yutao]$ hdfs dfs -ls hdfs://beh/user/dc_cbss/temp/ Found 10 items drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:47 hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8 drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:35 hdfs://beh/user/dc_cbss/temp/359a873ec9624623af9beae18b630fde drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:44 hdfs://beh/user/dc_cbss/temp/372f6065515e41a5b1d5e01af0a78d61 drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:50 hdfs://beh/user/dc_cbss/temp/3735b94780484f96b211ff6d6974ce3a drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:38 hdfs://beh/user/dc_cbss/temp/8411793f4c5547dc930aacaeea3177cd drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:29 hdfs://beh/user/dc_cbss/temp/915ff23f0d9e4c2dab699d1dcc5a8b4e drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:32 hdfs://beh/user/dc_cbss/temp/bea0bef07d5f47cd92541c69b16aa64e drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:26 hdfs://beh/user/dc_cbss/temp/c42c760144da4f9d83104af270ed46c1 drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:41 hdfs://beh/user/dc_cbss/temp/d8af69e47a5844a3a8ed7090ea13a278 drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:50 hdfs://beh/user/dc_cbss/temp/db6dceb913444c92a3453903fb50f486 [dc_cbss@hive_client_004 yutao]$ hdfs dfs -ls hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/ Found 8 items -rw-r--r-- 3 dc_cbss dc_cbss 3100 2020-07-27 14:45 hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/24b93d2ffbc14472b3c0e3d2cd948632_batchno0-0-null-1595831979508.carbonindex -rw-r--r-- 3 dc_cbss dc_cbss 3104 2020-07-27 14:47 hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/2da284a3beed4c15a3b60c7849d2da92_batchno0-0-null-1595832075416.carbonindex -rw-r--r-- 3 dc_cbss dc_cbss 3104 2020-07-27 14:47 hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/70b01854c2d446889b91d4bc9203587c_batchno0-0-null-1595832123015.carbonindex -rw-r--r-- 3 dc_cbss dc_cbss 3110 2020-07-27 14:46 hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/aae80851ef534c9ca6f95669d56ec636_batchno0-0-null-1595832028966.carbonindex -rw-r--r-- 3 dc_cbss dc_cbss 54526 2020-07-27 14:45 hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/part-0-24b93d2ffbc14472b3c0e3d2cd948632_batchno0-0-null-1595831979508.snappy.carbondata -rw-r--r-- 3 dc_cbss dc_cbss 54710 2020-07-27 14:47 hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/part-0-2da284a3beed4c15a3b60c7849d2da92_batchno0-0-null-1595832075416.snappy.carbondata -rw-r--r-- 3 dc_cbss dc_cbss 38684 2020-07-27 14:47 hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/part-0-70b01854c2d446889b91d4bc9203587c_batchno0-0-null-1595832123015.snappy.carbondata -rw-r--r-- 3 dc_cbss dc_cbss 55229 2020-07-27 14:46 hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/part-0-aae80851ef534c9ca6f95669d56ec636_batchno0-0-null-1595832028966.snappy.carbondata but there no stage_data directory and data not mv to stage_data when flink app commit; i debug code find in CarbonWriter.java file find this method influence it ; protected StageInput uploadSegmentDataFiles(final String localPath, final String remotePath) { if (!this.table.isHivePartitionTable()) { final *{color:#ff0000}File[] files = new File(localPath).listFiles();{color}* if (files == null) { LOGGER.error("files is null" ); return null; } Map<String, Long> fileNameMapLength = new HashMap<>(files.length); for (File file : files) { fileNameMapLength.put(file.getName(), file.length()); if (LOGGER.isDebugEnabled()) { LOGGER.debug( "Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] start."); } try { CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), remotePath, 1024); } catch (CarbonDataWriterException exception) { LOGGER.error(exception.getMessage(), exception); throw exception; } if (LOGGER.isDebugEnabled()) { LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] end."); } } return new StageInput(remotePath, fileNameMapLength); } else { final List<StageInput.PartitionLocation> partitionLocationList = new ArrayList<>(); final List<String> partitions = new ArrayList<>(); uploadSegmentDataFiles(new File(localPath), remotePath, partitionLocationList, partitions); if (partitionLocationList.isEmpty()) { return null; } else { return new StageInput(remotePath, partitionLocationList); } } the local path is a hdfs file so {color:#ff0000}files is null ;{color} was: [https://github.com/apache/carbondata/blob/master/docs/flink-integration-guide.md] i work with this ,use spark sql create carbondata table and i can see -rw-r--r-- 3 hadoop dc_cbss 2650 2020-07-25 21:06 hdfs://beh/user/dc_cbss/warehouse/testyu.db/userpolicy/Metadata/schema then i write flink app and run with yarn; it work i can see carbonfile in my code defined directory ; val dataTempPath = "hdfs://beh/user/dc_cbss/temp/" [dc_cbss@hive_client_004 yutao]$ hdfs dfs -ls hdfs://beh/user/dc_cbss/temp/ Found 10 items drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:47 hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8 drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:35 hdfs://beh/user/dc_cbss/temp/359a873ec9624623af9beae18b630fde drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:44 hdfs://beh/user/dc_cbss/temp/372f6065515e41a5b1d5e01af0a78d61 drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:50 hdfs://beh/user/dc_cbss/temp/3735b94780484f96b211ff6d6974ce3a drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:38 hdfs://beh/user/dc_cbss/temp/8411793f4c5547dc930aacaeea3177cd drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:29 hdfs://beh/user/dc_cbss/temp/915ff23f0d9e4c2dab699d1dcc5a8b4e drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:32 hdfs://beh/user/dc_cbss/temp/bea0bef07d5f47cd92541c69b16aa64e drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:26 hdfs://beh/user/dc_cbss/temp/c42c760144da4f9d83104af270ed46c1 drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:41 hdfs://beh/user/dc_cbss/temp/d8af69e47a5844a3a8ed7090ea13a278 drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:50 hdfs://beh/user/dc_cbss/temp/db6dceb913444c92a3453903fb50f486 [dc_cbss@hive_client_004 yutao]$ hdfs dfs -ls hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/ Found 8 items -rw-r--r-- 3 dc_cbss dc_cbss 3100 2020-07-27 14:45 hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/24b93d2ffbc14472b3c0e3d2cd948632_batchno0-0-null-1595831979508.carbonindex -rw-r--r-- 3 dc_cbss dc_cbss 3104 2020-07-27 14:47 hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/2da284a3beed4c15a3b60c7849d2da92_batchno0-0-null-1595832075416.carbonindex -rw-r--r-- 3 dc_cbss dc_cbss 3104 2020-07-27 14:47 hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/70b01854c2d446889b91d4bc9203587c_batchno0-0-null-1595832123015.carbonindex -rw-r--r-- 3 dc_cbss dc_cbss 3110 2020-07-27 14:46 hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/aae80851ef534c9ca6f95669d56ec636_batchno0-0-null-1595832028966.carbonindex -rw-r--r-- 3 dc_cbss dc_cbss 54526 2020-07-27 14:45 hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/part-0-24b93d2ffbc14472b3c0e3d2cd948632_batchno0-0-null-1595831979508.snappy.carbondata -rw-r--r-- 3 dc_cbss dc_cbss 54710 2020-07-27 14:47 hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/part-0-2da284a3beed4c15a3b60c7849d2da92_batchno0-0-null-1595832075416.snappy.carbondata -rw-r--r-- 3 dc_cbss dc_cbss 38684 2020-07-27 14:47 hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/part-0-70b01854c2d446889b91d4bc9203587c_batchno0-0-null-1595832123015.snappy.carbondata -rw-r--r-- 3 dc_cbss dc_cbss 55229 2020-07-27 14:46 hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/part-0-aae80851ef534c9ca6f95669d56ec636_batchno0-0-null-1595832028966.snappy.carbondata but there no stage_data directory and data not mv to stage_data when flink app commit; i debug code find in CarbonWriter.java file protected StageInput uploadSegmentDataFiles(final String localPath, final String remotePath) { if (!this.table.isHivePartitionTable()) { final *{color:#FF0000}File[] files = new File(localPath).listFiles();{color}* if (files == null) { LOGGER.error("files is null" ); return null; } Map<String, Long> fileNameMapLength = new HashMap<>(files.length); for (File file : files) { fileNameMapLength.put(file.getName(), file.length()); if (LOGGER.isDebugEnabled()) { LOGGER.debug( "Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] start."); } try { CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), remotePath, 1024); } catch (CarbonDataWriterException exception) { LOGGER.error(exception.getMessage(), exception); throw exception; } if (LOGGER.isDebugEnabled()) { LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] end."); } } return new StageInput(remotePath, fileNameMapLength); } else { final List<StageInput.PartitionLocation> partitionLocationList = new ArrayList<>(); final List<String> partitions = new ArrayList<>(); uploadSegmentDataFiles(new File(localPath), remotePath, partitionLocationList, partitions); if (partitionLocationList.isEmpty()) { return null; } else { return new StageInput(remotePath, partitionLocationList); } } the local path is a hdfs file so {color:#FF0000}files is null ;{color} > flink-integration i find it can't move file to stage_data directory > -------------------------------------------------------------------- > > Key: CARBONDATA-3926 > URL: https://issues.apache.org/jira/browse/CARBONDATA-3926 > Project: CarbonData > Issue Type: Bug > Components: flink-integration > Affects Versions: 2.0.0, 2.0.1 > Environment: my hadoop is cdh-5.16.1 and spark 2.3.3, flink 1.10.1,hive 1.1.0 > Reporter: yutao > Priority: Critical > Fix For: 2.1.0 > > > [https://github.com/apache/carbondata/blob/master/docs/flink-integration-guide.md] > i work with this ,use spark sql create carbondata table and i can see > -rw-r--r-- 3 hadoop dc_cbss 2650 2020-07-25 21:06 hdfs://beh/user/dc_cbss/warehouse/testyu.db/userpolicy/Metadata/schema > then i write flink app and run with yarn; > it work i can see carbonfile in my code defined directory ; > val dataTempPath = "hdfs://beh/user/dc_cbss/temp/" > [dc_cbss@hive_client_004 yutao]$ hdfs dfs -ls hdfs://beh/user/dc_cbss/temp/ > Found 10 items > drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:47 hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8 > drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:35 hdfs://beh/user/dc_cbss/temp/359a873ec9624623af9beae18b630fde > drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:44 hdfs://beh/user/dc_cbss/temp/372f6065515e41a5b1d5e01af0a78d61 > drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:50 hdfs://beh/user/dc_cbss/temp/3735b94780484f96b211ff6d6974ce3a > drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:38 hdfs://beh/user/dc_cbss/temp/8411793f4c5547dc930aacaeea3177cd > drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:29 hdfs://beh/user/dc_cbss/temp/915ff23f0d9e4c2dab699d1dcc5a8b4e > drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:32 hdfs://beh/user/dc_cbss/temp/bea0bef07d5f47cd92541c69b16aa64e > drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:26 hdfs://beh/user/dc_cbss/temp/c42c760144da4f9d83104af270ed46c1 > drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:41 hdfs://beh/user/dc_cbss/temp/d8af69e47a5844a3a8ed7090ea13a278 > drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:50 hdfs://beh/user/dc_cbss/temp/db6dceb913444c92a3453903fb50f486 > [dc_cbss@hive_client_004 yutao]$ hdfs dfs -ls hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/ > Found 8 items > -rw-r--r-- 3 dc_cbss dc_cbss 3100 2020-07-27 14:45 hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/24b93d2ffbc14472b3c0e3d2cd948632_batchno0-0-null-1595831979508.carbonindex > -rw-r--r-- 3 dc_cbss dc_cbss 3104 2020-07-27 14:47 hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/2da284a3beed4c15a3b60c7849d2da92_batchno0-0-null-1595832075416.carbonindex > -rw-r--r-- 3 dc_cbss dc_cbss 3104 2020-07-27 14:47 hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/70b01854c2d446889b91d4bc9203587c_batchno0-0-null-1595832123015.carbonindex > -rw-r--r-- 3 dc_cbss dc_cbss 3110 2020-07-27 14:46 hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/aae80851ef534c9ca6f95669d56ec636_batchno0-0-null-1595832028966.carbonindex > -rw-r--r-- 3 dc_cbss dc_cbss 54526 2020-07-27 14:45 hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/part-0-24b93d2ffbc14472b3c0e3d2cd948632_batchno0-0-null-1595831979508.snappy.carbondata > -rw-r--r-- 3 dc_cbss dc_cbss 54710 2020-07-27 14:47 hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/part-0-2da284a3beed4c15a3b60c7849d2da92_batchno0-0-null-1595832075416.snappy.carbondata > -rw-r--r-- 3 dc_cbss dc_cbss 38684 2020-07-27 14:47 hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/part-0-70b01854c2d446889b91d4bc9203587c_batchno0-0-null-1595832123015.snappy.carbondata > -rw-r--r-- 3 dc_cbss dc_cbss 55229 2020-07-27 14:46 hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/part-0-aae80851ef534c9ca6f95669d56ec636_batchno0-0-null-1595832028966.snappy.carbondata > > but there no stage_data directory and data not mv to stage_data when flink app commit; > i debug code find in CarbonWriter.java file find this method influence it ; > protected StageInput uploadSegmentDataFiles(final String localPath, final String remotePath) { > if (!this.table.isHivePartitionTable()) { > final *{color:#ff0000}File[] files = new File(localPath).listFiles();{color}* > if (files == null) > { LOGGER.error("files is null" ); return null; } > Map<String, Long> fileNameMapLength = new HashMap<>(files.length); > for (File file : files) { > fileNameMapLength.put(file.getName(), file.length()); > if (LOGGER.isDebugEnabled()) > { LOGGER.debug( "Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] start."); } > try > { CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), remotePath, 1024); } > catch (CarbonDataWriterException exception) > { LOGGER.error(exception.getMessage(), exception); throw exception; } > if (LOGGER.isDebugEnabled()) > { LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] end."); } > } > return new StageInput(remotePath, fileNameMapLength); > } else { > final List<StageInput.PartitionLocation> partitionLocationList = new ArrayList<>(); > final List<String> partitions = new ArrayList<>(); > uploadSegmentDataFiles(new File(localPath), remotePath, partitionLocationList, partitions); > if (partitionLocationList.isEmpty()) > { return null; } > else > { return new StageInput(remotePath, partitionLocationList); } > } > the local path is a hdfs file so {color:#ff0000}files is null ;{color} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |