[GitHub] carbondata pull request #1508: [CARBONDATA-1738] Block direct insert/load on...

classic Classic list List threaded Threaded
75 messages Options
1234
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1508: [CARBONDATA-1738] [PreAgg] Block direct insert/load ...

qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1508
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1477/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1508: [CARBONDATA-1738] [PreAgg] Block direct insert/load ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1508
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1479/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1508: [CARBONDATA-1738] [PreAgg] Block direct insert/load ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1508
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1896/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1508: [CARBONDATA-1738] [PreAgg] Block direct insert/load ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1508
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1482/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1508: [CARBONDATA-1738] [PreAgg] Block direct insert/load ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1508
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1900/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1508: [CARBONDATA-1738] [PreAgg] Block direct insert/load ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1508
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1485/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1508: [CARBONDATA-1738] [PreAgg] Block direct insert/load ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1508
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1903/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1508: [CARBONDATA-1738] [PreAgg] Block direct insert/load ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1508
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1498/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1508: [CARBONDATA-1738] [PreAgg] Block direct insert/load ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1508
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1915/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1508: [CARBONDATA-1738] [PreAgg] Block direct insert/load ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1508
 
    Build Failed with Spark 1.6, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/392/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1508: [CARBONDATA-1738] [PreAgg] Block direct insert/load ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1508
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1522/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1508: [CARBONDATA-1738] [PreAgg] Block direct insert/load ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1508
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1924/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1508#discussion_r153722006
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala ---
    @@ -221,7 +221,6 @@ object DataLoadingUtil {
         ValidateUtil.validateDateTimeFormat(timestampformat, "TimestampFormat")
         ValidateUtil.validateDateTimeFormat(dateFormat, "DateFormat")
         ValidateUtil.validateSortScope(table, sort_scope)
    -
    --- End diff --
   
    don 't change file unnecessarly


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1508#discussion_r153722623
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala ---
    @@ -94,10 +95,21 @@ case class CreatePreAggregateTableCommand(
           dmproperties.foreach(f => childSchema.getProperties.put(f._1, f._2))
           // updating the parent table about child table
           PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession)
    -      val loadAvailable = PreAggregateUtil.checkMainTableLoad(parentTable)
    -      if (loadAvailable) {
    -        sparkSession.sql(
    -          s"insert into ${ tableModel.databaseName }.${ tableModel.tableName } $queryString")
    +      val availableLoads = PreAggregateUtil.checkMainTableLoad(parentTable)
    +      if (availableLoads) {
    +        val headers = childSchema.getChildSchema.getListOfColumns.asScala.map(_.getColumnName)
    --- End diff --
   
    Move line done


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1508#discussion_r153722722
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala ---
    @@ -94,10 +95,21 @@ case class CreatePreAggregateTableCommand(
           dmproperties.foreach(f => childSchema.getProperties.put(f._1, f._2))
           // updating the parent table about child table
           PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession)
    -      val loadAvailable = PreAggregateUtil.checkMainTableLoad(parentTable)
    -      if (loadAvailable) {
    -        sparkSession.sql(
    -          s"insert into ${ tableModel.databaseName }.${ tableModel.tableName } $queryString")
    +      val availableLoads = PreAggregateUtil.checkMainTableLoad(parentTable)
    +      if (availableLoads) {
    +        val headers = childSchema.getChildSchema.getListOfColumns.asScala.map(_.getColumnName)
    +          .mkString(",")
    +        val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser()
    --- End diff --
   
    Move down after (


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1508#discussion_r153722923
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
    @@ -49,13 +50,54 @@ object LoadPostAggregateListener extends OperationEventListener {
                                     carbonLoadModel.getTableName, "false")
             val childTableName = dataMapSchema.getRelationIdentifier.getTableName
             val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName
    -        val selectQuery = dataMapSchema.getProperties.get("CHILD_SELECT QUERY")
    -        sparkSession.sql(s"insert into $childDatabaseName.$childTableName $selectQuery")
    +        val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser()
    +          .addPreAggLoadFunction(s"${ dataMapSchema.getProperties.get("CHILD_SELECT QUERY") } "))
    +          .drop("preAggLoad")
    +        val headers = dataMapSchema.getChildSchema.getListOfColumns.asScala.map(_.getColumnName)
    --- End diff --
   
    move down


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1508#discussion_r153723385
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
    @@ -49,13 +50,54 @@ object LoadPostAggregateListener extends OperationEventListener {
                                     carbonLoadModel.getTableName, "false")
             val childTableName = dataMapSchema.getRelationIdentifier.getTableName
             val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName
    -        val selectQuery = dataMapSchema.getProperties.get("CHILD_SELECT QUERY")
    -        sparkSession.sql(s"insert into $childDatabaseName.$childTableName $selectQuery")
    +        val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser()
    +          .addPreAggLoadFunction(s"${ dataMapSchema.getProperties.get("CHILD_SELECT QUERY") } "))
    +          .drop("preAggLoad")
    +        val headers = dataMapSchema.getChildSchema.getListOfColumns.asScala.map(_.getColumnName)
    +          .mkString(",")
    +        try {
    +          LoadTableCommand(Some(childDatabaseName),
    +            childTableName,
    +            null,
    +            Nil,
    +            Map("fileheader" -> headers),
    +            isOverwriteTable = false,
    +            dataFrame = Some(childDataFrame),
    +            internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true"))
    +            .run(sparkSession)
    +        } finally {
    +          CarbonSession.threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
    +                                    carbonLoadModel.getDatabaseName + "." +
    +                                    carbonLoadModel.getTableName)
    +          CarbonSession.threadUnset(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
    +                                    carbonLoadModel.getDatabaseName + "." +
    +                                    carbonLoadModel.getTableName)
    +        }
           }
         }
       }
     }
     
    +object LoadPreAggregateTablePreListener extends OperationEventListener {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override def onEvent(event: Event, operationContext: OperationContext): Unit = {
    +    val loadEvent = event.asInstanceOf[LoadTablePreExecutionEvent]
    +    val carbonLoadModel = loadEvent.carbonLoadModel
    +    val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +    val isInternalLoadCall = carbonLoadModel.isAggLoadRequest
    +    if (table.isChildDataMap && !isInternalLoadCall) {
    +      throw new UnsupportedOperationException(
    +        "Cannot insert/load data directly into pre-aggregate table")
    +    }
    +
    --- End diff --
   
    remove line


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1508#discussion_r153723567
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---
    @@ -253,8 +260,8 @@ object PreAggregateUtil {
               carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
               parentTableName,
               parentDatabaseName, parentTableId = parentTableId)
    -      case _ =>
    -        throw new MalformedCarbonCommandException("Un-Supported Aggregation Type")
    +      case a@_ =>
    --- End diff --
   
    Keep as case others


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1508#discussion_r153723895
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -751,6 +754,58 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
       }
     }
     
    +object CarbonPreAggregateDataLoadingRules extends Rule[LogicalPlan] {
    +
    +  override def apply(plan: LogicalPlan): LogicalPlan = {
    +
    +    plan transform {
    +
    +      case aggregate@Aggregate(_, aExp, _) if validateAggregateExpressions(aExp) =>
    +        val newExpressions = aExp.flatMap {
    +          case alias@Alias(attrExpression: AggregateExpression, _) =>
    +            attrExpression.aggregateFunction match {
    +              case Average(attr: AttributeReference) =>
    +                Seq(Alias(attrExpression
    +                  .copy(aggregateFunction = Sum(attr),
    +                    resultId = NamedExpression.newExprId), attr.name + "_sum")(),
    +                  Alias(attrExpression
    +                    .copy(aggregateFunction = Count(attr),
    +                      resultId = NamedExpression.newExprId), attr.name + "_count")())
    +              case Average(cast@Cast(attr: AttributeReference, _)) =>
    +                Seq(Alias(attrExpression
    +                  .copy(aggregateFunction = Sum(cast),
    +                    resultId = NamedExpression.newExprId),
    +                  attr.name + "_sum")(),
    +                  Alias(attrExpression
    +                    .copy(aggregateFunction = Count(cast),
    +                      resultId = NamedExpression.newExprId), attr.name + "_count")())
    --- End diff --
   
    Please format it properly


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1508#discussion_r153724030
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -751,6 +754,58 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
       }
     }
     
    +object CarbonPreAggregateDataLoadingRules extends Rule[LogicalPlan] {
    +
    +  override def apply(plan: LogicalPlan): LogicalPlan = {
    +
    +    plan transform {
    +
    +      case aggregate@Aggregate(_, aExp, _) if validateAggregateExpressions(aExp) =>
    +        val newExpressions = aExp.flatMap {
    +          case alias@Alias(attrExpression: AggregateExpression, _) =>
    +            attrExpression.aggregateFunction match {
    +              case Average(attr: AttributeReference) =>
    +                Seq(Alias(attrExpression
    +                  .copy(aggregateFunction = Sum(attr),
    +                    resultId = NamedExpression.newExprId), attr.name + "_sum")(),
    +                  Alias(attrExpression
    +                    .copy(aggregateFunction = Count(attr),
    +                      resultId = NamedExpression.newExprId), attr.name + "_count")())
    +              case Average(cast@Cast(attr: AttributeReference, _)) =>
    +                Seq(Alias(attrExpression
    +                  .copy(aggregateFunction = Sum(cast),
    +                    resultId = NamedExpression.newExprId),
    +                  attr.name + "_sum")(),
    +                  Alias(attrExpression
    +                    .copy(aggregateFunction = Count(cast),
    +                      resultId = NamedExpression.newExprId), attr.name + "_count")())
    +              case _ => Seq(alias)
    +            }
    +          case namedExpr: NamedExpression => Seq(namedExpr)
    +        }
    +        aggregate.copy(aggregateExpressions = newExpressions.asInstanceOf[Seq[NamedExpression]])
    +      case plan: LogicalPlan => plan
    +    }
    +  }
    +
    +  /**
    +   * Called by PreAggregateLoadingRules to validate if plan is valid for applying rules or not.
    +   * If the plan has PreAggLoad i.e Loading UDF and does not have PreAgg i.e Query UDF then it is
    +   * valid.
    +   *
    +   * @param namedExpression
    +   * @return
    +   */
    +  private def validateAggregateExpressions(namedExpression: Seq[NamedExpression]): Boolean = {
    +    val filteredExpressions = namedExpression.filterNot(_.isInstanceOf[UnresolvedAlias])
    +    filteredExpressions
    +      .exists {
    +        expr => !expr.name.equalsIgnoreCase("PreAgg") &&
    +                        expr.name.equalsIgnoreCase("preAggLoad")
    --- End diff --
   
    format properly


---
1234