Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Branch 2.1 #19187
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Uh oh!
There was an error while loading. Please reload this page.
Branch 2.1 #19187
Changes from 1 commit
dd1abefe642a07706d6c14d0402971b6eac502c927b3fd36aa3d5300ff5818b7b5ea00e580bb0173c23806e77e0fe4fcc5a3b6751ef4fb7ec5a7cb0328b2292968d8c5db23477fe3543c8113b0f837ced7763b0b8ee4ec86c3539988c43f4b9ab4c0db7adb6252dd0555958bc6e3abedb083ec57c371dec3316746edf02a9a890b521afc45d30238f43084b366a7ca26da6a27ed9aaa397866e120a432904fbb9e4b4c3bf947c0cdd887f75f719cccbbe0d8c27347b53a7591a1237aaeaccbed7da04d45664c979ca7a7e8fd6c6d5711addd551b7bdcbc37003b648a60ba9ecb320eff1f6c1ad23457c3278cc57200859e10c140c12a76e24ffe65b0a59cc36f0d50fd5a2ad43e481a73f9833c68c460804545782a0ce84580ebca606225469d032d04b977ff710b5555fb7083780f606b60f690af8bf21d205d40c4c7b18a88c88a5c18b6c9dfdd2aa04428f30abb95c4d2b83277ed3756f997faf960e892f0b01d989434b6d348e40561914bcb7d6fd2e406e669dd702b165d4964dbe3095480f8c1b3e103ff546a1b2ebe3cec18968eaceca1441077700eaf9546da00c1248efc72dc2b85e05fb81a417791120fc242cc658b35843a7fca1a73046bc7304e489c1f3b26f2c2f40e44d8eb71b803a42c046e212db2970d9dbb6d1b7e0dddabe36c2f98ae548bca7ce26f715c02ed19cf2a3e50eefa11a47aad057db9517c622d7a83808b47a4c1ebc171bf659e5dc8266e7a8ffb0351aba50580d99b49b427966565990fc2d47e1a359382c267aca58460b096696ad05131b0a868b4a15915588d10b0f6179f5372a7f5da704b249a1112c6f7a91a112c937e50f28df8e0978969786eabdc08ab92a71a66e89d5795de46762969e914b6a9dba35c6be06d936e326de4c53fe79e9804b3c3a986bf5ef076f4538c913adc0f2f68631c3302e87015f6f7fc23474f6fccf6e6adccebd72f438f37c54640086dade85f772a9b90b25a7dafab85503cc18b58a8a37ee0e74ea89046662f2b80915a2010ebb3b8a44c1187799f358923bac6b37c861a98d5df8fd3b4bcaf06cf12883e6750db30d6b70126f4f34083adb0d995dac3ecef248f1ca697f7b63b5e2bfd52c28462ca4d2aaa9efce4caf32b3ac2069394987988520d7c258ca4078f7cdfb31b302d93e45b734b1445634fad444cca19b749b66f366fb2394ae23d3be4d5769753041eccb6a8a726ae4e8aee7696ebFile filter
Filter by extension
Conversations
Uh oh!
There was an error while loading. Please reload this page.
Jump to
Uh oh!
There was an error while loading. Please reload this page.
…h queires ## What changes were proposed in this pull request? Add a new Kafka Sink and Kafka Relation for writing streaming and batch queries, respectively, to Apache Kafka. ### Streaming Kafka Sink - When addBatch is called -- If batchId is great than the last written batch --- Write batch to Kafka ---- Topic will be taken from the record, if present, or from a topic option, which overrides topic in record. -- Else ignore ### Batch Kafka Sink - KafkaSourceProvider will implement CreatableRelationProvider - CreatableRelationProvider#createRelation will write the passed in Dataframe to a Kafka - Topic will be taken from the record, if present, or from topic option, which overrides topic in record. - Save modes Append and ErrorIfExist supported under identical semantics. Other save modes result in an AnalysisException tdas zsxwing ## How was this patch tested? ### The following unit tests will be included - write to stream with topic field: valid stream write with data that includes an existing topic in the schema - write structured streaming aggregation w/o topic field, with default topic: valid stream write with data that does not include a topic field, but the configuration includes a default topic - write data with bad schema: various cases of writing data that does not conform to a proper schema e.g., 1. no topic field or default topic, and 2. no value field - write data with valid schema but wrong types: data with a complete schema but wrong types e.g., key and value types are integers. - write to non-existing topic: write a stream to a topic that does not exist in Kafka, which has been configured to not auto-create topics. - write batch to kafka: simple write batch to Kafka, which goes through the same code path as streaming scenario, so validity checks will not be redone here. ### Examples ```scala // Structured Streaming val writer = inputStringStream.map(s => s.get(0).toString.getBytes()).toDF("value") .selectExpr("value as key", "value as value") .writeStream .format("kafka") .option("checkpointLocation", checkpointDir) .outputMode(OutputMode.Append) .option("kafka.bootstrap.servers", brokerAddress) .option("topic", topic) .queryName("kafkaStream") .start() // Batch val df = spark .sparkContext .parallelize(Seq("1", "2", "3", "4", "5")) .map(v => (topic, v)) .toDF("topic", "value") df.write .format("kafka") .option("kafka.bootstrap.servers",brokerAddress) .option("topic", topic) .save() ``` Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Tyson Condie <[email protected]> Closes #17043 from tcondie/kafka-writer.Uh oh!
There was an error while loading. Please reload this page.
There are no files selected for viewing
Uh oh!
There was an error while loading. Please reload this page.