|
27 | 27 | import org.apache.dolphinscheduler.common.utils.FileUtils; |
28 | 28 | import org.apache.dolphinscheduler.common.utils.JSONUtils; |
29 | 29 | import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider; |
| 30 | +import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils; |
30 | 31 | import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor; |
31 | 32 | import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; |
32 | 33 | import org.apache.dolphinscheduler.plugin.task.api.TaskException; |
|
45 | 46 | import java.io.IOException; |
46 | 47 | import java.io.InputStream; |
47 | 48 | import java.lang.reflect.Field; |
| 49 | +import java.lang.reflect.Method; |
48 | 50 | import java.nio.file.Files; |
49 | 51 | import java.sql.Connection; |
50 | 52 | import java.sql.PreparedStatement; |
51 | 53 | import java.sql.ResultSet; |
52 | 54 | import java.sql.ResultSetMetaData; |
53 | 55 | import java.sql.SQLException; |
54 | 56 | import java.util.HashMap; |
| 57 | +import java.util.List; |
55 | 58 | import java.util.Map; |
56 | 59 |
|
57 | 60 | import org.junit.jupiter.api.Assertions; |
|
62 | 65 | import org.mockito.Mockito; |
63 | 66 | import org.mockito.junit.jupiter.MockitoExtension; |
64 | 67 |
|
| 68 | +import com.fasterxml.jackson.databind.node.ObjectNode; |
| 69 | + |
65 | 70 | @ExtendWith(MockitoExtension.class) |
66 | 71 | public class DataxTaskTest { |
67 | 72 |
|
@@ -273,6 +278,69 @@ private TaskExecutionContext buildTestTaskExecutionContext() { |
273 | 278 | return taskExecutionContext; |
274 | 279 | } |
275 | 280 |
|
| 281 | + @Test |
| 282 | + public void testBuildDataxJobContentJsonWithBatchSize() throws Exception { |
| 283 | + // set batchSize > 0 via reflection |
| 284 | + Field dataXParametersField = DataxTask.class.getDeclaredField("dataXParameters"); |
| 285 | + dataXParametersField.setAccessible(true); |
| 286 | + DataxParameters params = (DataxParameters) dataXParametersField.get(dataxTask); |
| 287 | + params.setBatchSize(1024); |
| 288 | + params.setDsType("MYSQL"); |
| 289 | + params.setDtType("MYSQL"); |
| 290 | + |
| 291 | + // set dataxTaskExecutionContext via reflection |
| 292 | + DataxTaskExecutionContext ctx = new DataxTaskExecutionContext(); |
| 293 | + ctx.setSourcetype(DbType.MYSQL); |
| 294 | + ctx.setTargetType(DbType.MYSQL); |
| 295 | + ctx.setSourceConnectionParams( |
| 296 | + "{\"user\":\"root\",\"password\":\"123456\",\"address\":\"jdbc:mysql://localhost:3306\"}"); |
| 297 | + ctx.setTargetConnectionParams( |
| 298 | + "{\"user\":\"root\",\"password\":\"123456\",\"address\":\"jdbc:mysql://localhost:3306\"}"); |
| 299 | + |
| 300 | + Field ctxField = DataxTask.class.getDeclaredField("dataxTaskExecutionContext"); |
| 301 | + ctxField.setAccessible(true); |
| 302 | + ctxField.set(dataxTask, ctx); |
| 303 | + |
| 304 | + BaseConnectionParam mockConnParam = mock(BaseConnectionParam.class); |
| 305 | + when(mockConnParam.getUser()).thenReturn("root"); |
| 306 | + when(mockConnParam.getPassword()).thenReturn("123456"); |
| 307 | + when(mockConnParam.getCompatibleMode()).thenReturn(null); |
| 308 | + |
| 309 | + try ( |
| 310 | + MockedStatic<DataSourceUtils> mockedDataSourceUtils = mockStatic(DataSourceUtils.class); |
| 311 | + MockedStatic<DataSourceClientProvider> mockedProvider = mockStatic(DataSourceClientProvider.class)) { |
| 312 | + |
| 313 | + mockedDataSourceUtils |
| 314 | + .when(() -> DataSourceUtils.buildConnectionParams(Mockito.any(DbType.class), Mockito.anyString())) |
| 315 | + .thenReturn(mockConnParam); |
| 316 | + mockedDataSourceUtils.when(() -> DataSourceUtils.getJdbcUrl(Mockito.any(DbType.class), Mockito.any())) |
| 317 | + .thenReturn("jdbc:mysql://localhost:3306/test"); |
| 318 | + |
| 319 | + Connection connection = mock(Connection.class); |
| 320 | + mockedProvider.when(() -> DataSourceClientProvider.getAdHocConnection(Mockito.any(), Mockito.any())) |
| 321 | + .thenReturn(connection); |
| 322 | + |
| 323 | + PreparedStatement stmt = mock(PreparedStatement.class); |
| 324 | + when(connection.prepareStatement(anyString())).thenReturn(stmt); |
| 325 | + ResultSetMetaData md = mock(ResultSetMetaData.class); |
| 326 | + when(md.getColumnCount()).thenReturn(1); |
| 327 | + when(md.getColumnLabel(eq(1))).thenReturn("col1"); |
| 328 | + ResultSet resultSet = mock(ResultSet.class); |
| 329 | + when(resultSet.getMetaData()).thenReturn(md); |
| 330 | + when(stmt.executeQuery()).thenReturn(resultSet); |
| 331 | + |
| 332 | + Method method = DataxTask.class.getDeclaredMethod("buildDataxJobContentJson"); |
| 333 | + method.setAccessible(true); |
| 334 | + @SuppressWarnings("unchecked") |
| 335 | + List<ObjectNode> result = (List<ObjectNode>) method.invoke(dataxTask); |
| 336 | + |
| 337 | + Assertions.assertEquals(1, result.size()); |
| 338 | + ObjectNode writerParam = (ObjectNode) result.get(0).get("writer").get("parameter"); |
| 339 | + Assertions.assertTrue(writerParam.has("batchSize")); |
| 340 | + Assertions.assertEquals(1024, writerParam.get("batchSize").asInt()); |
| 341 | + } |
| 342 | + } |
| 343 | + |
276 | 344 | private String getJsonString() { |
277 | 345 | return "{\n" + |
278 | 346 | " \"job\": {\n" + |
|
0 commit comments