Skip to content

Conversation

@aokolnychyi
Copy link
Contributor

What changes were proposed in this pull request?

This PR adds DS v2 APIs for handling row-level operations for data sources that support deltas of rows.

Why are the changes needed?

These changes are part of the approved SPIP in SPARK-35801.

Does this PR introduce any user-facing change?

Yes, this PR adds new DS v2 APIs per design doc.

How was this patch tested?

Tests will be part of the implementation PR.

@github-actions github-actions bot added the SQL label Sep 26, 2022
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not add any doc for inherited methods as it would mostly overlap with the parent doc. I could add a few sentences and reference the parent doc, though.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Sep 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a default implementation to match the parent interface.
In the future, we may also override toStreaming.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to override to avoid inheriting the default implementation from the parent interface.

@aokolnychyi
Copy link
Contributor Author

@cloud-fan @rdblue @huaxingao @dongjoon-hyun @sunchao @viirya, could you take a look? This is the API from the design doc we discussed earlier.

I have also created PR #38005 that shows how this API will be consumed.

* the schema of the input metadata from Spark to data source.
*/
default StructType metadataSchema() {
return null;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default implementation is purely for compatibility.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I usually really want to avoid returning null. Even throw an exception is better than null. Probably this is only my opinion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am open to discuss what value should indicate no row ID or metadata schema.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Sep 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see Optional in a few other places in the connector API. I could switch to that.
I think using Optional is always debatable so it is usually up to a particular project to decide.

Copy link
Contributor

@amaliujia amaliujia Sep 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me see how the implementation use this API if there is any.

The case is more like if there is an implementation forget to implement this thus has used the default null. NPE usually come from this case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning null is pretty risky as you don't know when/where a NPE will happen. I'd prefer throw UnsupportedOperation by default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amaliujia @cloud-fan, we can do something like this.

/**
 * the schema of the ID columns from Spark to data source.
 */
default Optional<StructType> rowIdSchema() {
  throw new UnsupportedOperationException(
      getClass().getName() + " does not implement rowIdSchema");
}

/**
 * the schema of the input metadata from Spark to data source.
 */
default Optional<StructType> metadataSchema() {
  throw new UnsupportedOperationException(
      getClass().getName() + " does not implement metadataSchema");
}

Now the question is what to report in schema() for delta-based DELETE operations, where we do not pass the row, we just pass row ID and metadata. One option is to report an empty struct but let me know if you have other ideas.

The way I was approaching it initially:

schema() -> the row schema for new records (only MERGE adds new records)
rowIdSchema() -> the schema for row ID passed to data sources to mark a record as deleted/updated
metadataSchema() -> the schema of projected metadata columns that contain some extra info about the row that is being deleted/updated

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please proceed to update this PR according to your proposed way, @aokolnychyi .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated this PR and the reference implementation in PR #38005.

@cloud-fan
Copy link
Contributor

Sorry for the delay! Will take a look this week.

Copy link
Contributor

@amaliujia amaliujia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A general question about the PR:

I see the purpose of this PR is to add APIs. But does it make sense to have reference implementation or in some testing code that implement the APIs for a bit testing?

* the schema of the input metadata from Spark to data source.
*/
default StructType metadataSchema() {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I usually really want to avoid returning null. Even throw an exception is better than null. Probably this is only my opinion.

@aokolnychyi
Copy link
Contributor Author

@amaliujia, I have linked #38005 that adds test coverage and implementation. I've split this work to reduce the scope of each PR and simplify reviewing. Also, converging on the implementation usually takes more time so having smaller chunks of work helps to make some progress.

@amaliujia
Copy link
Contributor

Thanks for the link of the implementation!

* @param id a row ID to delete
* @throws IOException if failure happens during disk/network IO like writing files
*/
void delete(T metadata, T id) throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry it's been a while and I can't recall all the context. Do we have a concrete example that we need both metadata columns and id columns to delete/update rows?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem. It has been a while for me too.

A partition tuple can be one example. A data source may be able to handle operations on a primary key but knowing the old record partition can help the data source to encode a delete efficiently. In that case, the partition tuple is not really part of the row ID but rather extra metadata that allows to encode a delete.

Another example is a metadata column that would carry the old version of the row.

/**
* the schema of the input data from Spark to data source.
*/
StructType schema();
Copy link
Contributor Author

@aokolnychyi aokolnychyi Oct 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In delta-based DELETE operations, I plan to return an empty struct type here. See here for context.
Any ideas are welcome.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM from my side. Thank you, @aokolnychyi .

Please review once more if you have some time, @cloud-fan , @amaliujia , @viirya , @sunchao , @huaxingao .

@amaliujia
Copy link
Contributor

LGTM!

@huaxingao
Copy link
Contributor

LGTM

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Oct 13, 2022

Thank you, @aokolnychyi, @cloud-fan , @amaliujia , @viirya , @sunchao , @huaxingao .
Merged to master for Apache Spark 3.4.0.

* @since 3.4.0
*/
@Experimental
public interface DeltaWriter<T> extends DataWriter<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more comment: We use the type parameter T because it can be in either row or columnar format. However, I think this new delta writer can only work with rows. We should probably do DeltaWriter extends DataWriter<InternalRow>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me take a closer look on Monday.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, could you elaborate a bit on why you think we can only work with rows here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we have void delete(T metadata, T id) throws IOException;. Are we going to perform a delete with a batch of metadata and id rows?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I can see us passing batches of deletes and metadata at some point in the future. We can assume values at the same index will belong to the same row.

SandishKumarHN pushed a commit to SandishKumarHN/spark that referenced this pull request Dec 12, 2022
…operations

### What changes were proposed in this pull request?

This PR adds DS v2 APIs for handling row-level operations for data sources that support deltas of rows.

### Why are the changes needed?

These changes are part of the approved SPIP in SPARK-35801.

### Does this PR introduce _any_ user-facing change?

Yes, this PR adds new DS v2 APIs per [design doc](https://docs.google.com/document/d/12Ywmc47j3l2WF4anG5vL4qlrhT2OKigb7_EbIKhxg60).

### How was this patch tested?

Tests will be part of the implementation PR.

Closes apache#38004 from aokolnychyi/spark-40551.

Lead-authored-by: Anton Okolnychyi <[email protected]>
Co-authored-by: aokolnychyi <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
dongjoon-hyun pushed a commit that referenced this pull request Jan 19, 2023
…sed sources

### What changes were proposed in this pull request?

This PR adds support for DELETE commands for delta-based sources and implements the API added in PR #38004.

Suppose there is a data source capable of encoding deletes using primary keys (`pk`). Also, this data source requires knowing the source file, which can be projected via a metadata column (`_file`), to encode deletes efficiently.

As an example, there will be a table with 1 file that contains 3 records.
```
pk | salary | department
------------------------
1, 100, hr
2, 50, software
3, 150, hardware
```

This PR would rewrite `DELETE FROM t WHERE salary <= 100` to perform the following steps:
- find records that need to be removed by scanning the table with the delete condition;
- project required columns to encode deletes (`pk` + `_file` in our case);
- form a set of changes by adding a new column `__row_operation` column with value `delete`;
- write the set of changes to the table using `WriteDeltaExec` and `DeltaWriter`;

The set of changes to encode for the DELETE statement above will look like this:

```
__row_operation | pk | _file
----------------------------
delete, 1, file_a.parquet
delete, 2, file_a.parquet
```

As opposed to group-based deletes that Spark already supports, the new logic will be able to discard records that did not change in the file that had matches (i.e. the record with `pk = 3` did not match the condition and was discarded).

Then `WriteDeltaExec` will handle this set of changes and translate them into `delete()` calls on `DeltaWriter`. In the future, this logic will be extended to also cover UPDATEs and MERGEs by adding `update` and `insert` row operations to the set of changes supported by `WriteDeltaExec`.

### Why are the changes needed?

Thes changes are needed as per SPIP SPARK-35801.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

This PR comes with tests.

Closes #38005 from aokolnychyi/spark-40550-proto.

Authored-by: aokolnychyi <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants