3030import org .apache .flink .metrics .MetricGroup ;
3131import org .apache .flink .table .api .DataTypes ;
3232import org .apache .flink .table .types .DataType ;
33+ import org .apache .flink .table .types .logical .LogicalType ;
34+ import org .apache .flink .table .types .logical .RowType ;
3335import org .apache .flink .util .Collector ;
3436import org .apache .flink .util .Preconditions ;
3537
9395import static com .ververica .cdc .connectors .mysql .source .utils .RecordUtils .isSchemaChangeEvent ;
9496import static com .ververica .cdc .connectors .mysql .source .utils .RecordUtils .isWatermarkEvent ;
9597import static java .lang .String .format ;
98+ import static org .apache .flink .core .io .InputStatus .MORE_AVAILABLE ;
9699import static org .apache .flink .util .Preconditions .checkState ;
97100import static org .junit .Assert .assertEquals ;
98101import static org .junit .Assert .assertFalse ;
@@ -107,6 +110,107 @@ public class MySqlSourceReaderTest extends MySqlSourceTestBase {
107110 private final UniqueDatabase inventoryDatabase =
108111 new UniqueDatabase (MYSQL_CONTAINER , "inventory" , "mysqluser" , "mysqlpw" );
109112
113+ @ Test
114+ public void testFinishedUnackedSplitsUsingStateFromSnapshotPhase () throws Exception {
115+ customerDatabase .createAndInitialize ();
116+ final MySqlSourceConfig sourceConfig = getConfig (new String [] {"customers" });
117+ final DataType dataType =
118+ DataTypes .ROW (
119+ DataTypes .FIELD ("id" , DataTypes .BIGINT ()),
120+ DataTypes .FIELD ("name" , DataTypes .STRING ()),
121+ DataTypes .FIELD ("address" , DataTypes .STRING ()),
122+ DataTypes .FIELD ("phone_number" , DataTypes .STRING ()));
123+ List <MySqlSplit > snapshotSplits ;
124+ try (MySqlConnection jdbc = DebeziumUtils .createMySqlConnection (sourceConfig )) {
125+ Map <TableId , TableChanges .TableChange > tableSchemas =
126+ TableDiscoveryUtils .discoverSchemaForCapturedTables (
127+ new MySqlPartition (
128+ sourceConfig .getMySqlConnectorConfig ().getLogicalName ()),
129+ sourceConfig ,
130+ jdbc );
131+ TableId tableId = new TableId (customerDatabase .getDatabaseName (), null , "customers" );
132+ RowType splitType =
133+ RowType .of (
134+ new LogicalType [] {DataTypes .INT ().getLogicalType ()},
135+ new String [] {"id" });
136+ snapshotSplits =
137+ Arrays .asList (
138+ new MySqlSnapshotSplit (
139+ tableId ,
140+ tableId + ":0" ,
141+ splitType ,
142+ null ,
143+ new Integer [] {200 },
144+ null ,
145+ tableSchemas ),
146+ new MySqlSnapshotSplit (
147+ tableId ,
148+ tableId + ":1" ,
149+ splitType ,
150+ new Integer [] {200 },
151+ new Integer [] {1500 },
152+ null ,
153+ tableSchemas ),
154+ new MySqlSnapshotSplit (
155+ tableId ,
156+ tableId + ":2" ,
157+ splitType ,
158+ new Integer [] {1500 },
159+ null ,
160+ null ,
161+ tableSchemas ));
162+ }
163+
164+ // Step 1: start source reader and assign snapshot splits
165+ MySqlSourceReader <SourceRecord > reader = createReader (sourceConfig , -1 );
166+ reader .start ();
167+ reader .addSplits (snapshotSplits );
168+
169+ String [] expectedRecords =
170+ new String [] {
171+ "+I[111, user_6, Shanghai, 123567891234]" ,
172+ "+I[110, user_5, Shanghai, 123567891234]" ,
173+ "+I[101, user_1, Shanghai, 123567891234]" ,
174+ "+I[103, user_3, Shanghai, 123567891234]" ,
175+ "+I[102, user_2, Shanghai, 123567891234]" ,
176+ "+I[118, user_7, Shanghai, 123567891234]" ,
177+ "+I[121, user_8, Shanghai, 123567891234]" ,
178+ "+I[123, user_9, Shanghai, 123567891234]" ,
179+ "+I[109, user_4, Shanghai, 123567891234]" ,
180+ "+I[1009, user_10, Shanghai, 123567891234]" ,
181+ "+I[1011, user_12, Shanghai, 123567891234]" ,
182+ "+I[1010, user_11, Shanghai, 123567891234]" ,
183+ "+I[1013, user_14, Shanghai, 123567891234]" ,
184+ "+I[1012, user_13, Shanghai, 123567891234]" ,
185+ "+I[1015, user_16, Shanghai, 123567891234]" ,
186+ "+I[1014, user_15, Shanghai, 123567891234]" ,
187+ "+I[1017, user_18, Shanghai, 123567891234]" ,
188+ "+I[1016, user_17, Shanghai, 123567891234]" ,
189+ "+I[1019, user_20, Shanghai, 123567891234]" ,
190+ "+I[1018, user_19, Shanghai, 123567891234]" ,
191+ "+I[2000, user_21, Shanghai, 123567891234]"
192+ };
193+ // Step 2: wait the snapshot splits finished reading
194+ Thread .sleep (5000L );
195+ List <String > actualRecords = consumeRecords (reader , dataType );
196+ assertEqualsInAnyOrder (Arrays .asList (expectedRecords ), actualRecords );
197+
198+ // Step 3: snapshot reader's state
199+ List <MySqlSplit > splitsState = reader .snapshotState (1L );
200+
201+ // Step 4: restart reader from a restored state
202+ MySqlSourceReader <SourceRecord > restartReader = createReader (sourceConfig , -1 );
203+ restartReader .start ();
204+ restartReader .addSplits (splitsState );
205+
206+ // Step 5: check the finished unacked splits between original reader and restarted reader
207+ assertEquals (3 , reader .getFinishedUnackedSplits ().size ());
208+ assertMapEquals (
209+ restartReader .getFinishedUnackedSplits (), reader .getFinishedUnackedSplits ());
210+ reader .close ();
211+ restartReader .close ();
212+ }
213+
110214 @ Test
111215 public void testBinlogReadFailoverCrossTransaction () throws Exception {
112216 customerDatabase .createAndInitialize ();
@@ -411,8 +515,9 @@ private List<String> consumeRecords(
411515 MySqlSourceReader <SourceRecord > sourceReader , DataType recordType ) throws Exception {
412516 // Poll all the n records of the single split.
413517 final SimpleReaderOutput output = new SimpleReaderOutput ();
414- while (output .getResults ().size () == 0 ) {
415- sourceReader .pollNext (output );
518+ InputStatus status = MORE_AVAILABLE ;
519+ while (MORE_AVAILABLE == status || output .getResults ().size () == 0 ) {
520+ status = sourceReader .pollNext (output );
416521 }
417522 final RecordsFormatter formatter = new RecordsFormatter (recordType );
418523 return formatter .format (output .getResults ());
0 commit comments