11package org .replicadb .manager ;
22
3+ import com .google .common .io .CharSource ;
34import com .mysql .cj .jdbc .JdbcPreparedStatement ;
45import org .apache .commons .codec .binary .Hex ;
6+ import org .apache .commons .io .input .ReaderInputStream ;
57import org .mariadb .jdbc .MariaDbStatement ;
68
79import org .apache .logging .log4j .LogManager ;
1012import org .replicadb .cli .ToolOptions ;
1113import org .replicadb .manager .util .BandwidthThrottling ;
1214
13- import java .io .ByteArrayInputStream ;
1415import java .io .IOException ;
1516import java .nio .charset .StandardCharsets ;
1617import java .sql .*;
@@ -82,7 +83,6 @@ public int insertDataToTable (ResultSet resultSet, int taskId) throws SQLExcepti
8283 StringBuilder row = new StringBuilder ();
8384 StringBuilder cols = new StringBuilder ();
8485
85- byte [] bytes = "" .getBytes ();
8686 String colValue ;
8787 int rowCounts = 0 ;
8888 int batchSize = options .getFetchSize ();
@@ -132,17 +132,8 @@ public int insertDataToTable (ResultSet resultSet, int taskId) throws SQLExcepti
132132 row .append ("\n " );
133133
134134 // Copy data to mysql
135- bytes = row .toString ().getBytes (StandardCharsets .UTF_8 );
136-
137135 if (++rowCounts % batchSize == 0 ) {
138- if (mysqlStatement != null ) {
139- mysqlStatement .setLocalInfileInputStream (new ByteArrayInputStream (bytes ));
140- mysqlStatement .executeUpdate (loadDataSql );
141- } else {
142- assert mariadbStatement != null ;
143- mariadbStatement .setLocalInfileInputStream (new ByteArrayInputStream (bytes ));
144- mariadbStatement .executeUpdate (loadDataSql );
145- }
136+ copyData (loadDataSql , row , mariadbStatement , mysqlStatement );
146137
147138 // Clear StringBuilders
148139 row .setLength (0 ); // set length of buffer to 0
@@ -159,14 +150,7 @@ public int insertDataToTable (ResultSet resultSet, int taskId) throws SQLExcepti
159150
160151 // insert remaining records
161152 if (rowCounts != 0 ) {
162- if (mysqlStatement != null ) {
163- mysqlStatement .setLocalInfileInputStream (new ByteArrayInputStream (bytes ));
164- mysqlStatement .executeUpdate (loadDataSql );
165- } else {
166- assert mariadbStatement != null ;
167- mariadbStatement .setLocalInfileInputStream (new ByteArrayInputStream (bytes ));
168- mariadbStatement .executeUpdate (loadDataSql );
169- }
153+ copyData (loadDataSql , row , mariadbStatement , mysqlStatement );
170154 }
171155
172156 } catch (Exception e ) {
@@ -178,6 +162,17 @@ public int insertDataToTable (ResultSet resultSet, int taskId) throws SQLExcepti
178162 return totalRows ;
179163 }
180164
165+ private void copyData (String loadDataSql , StringBuilder row , MariaDbStatement mariadbStatement , JdbcPreparedStatement mysqlStatement ) throws IOException , SQLException {
166+ if (mysqlStatement != null ) {
167+ mysqlStatement .setLocalInfileInputStream (new ReaderInputStream (CharSource .wrap (row ).openStream (), StandardCharsets .UTF_8 ));
168+ mysqlStatement .executeUpdate (loadDataSql );
169+ } else {
170+ assert mariadbStatement != null ;
171+ mariadbStatement .setLocalInfileInputStream (new ReaderInputStream (CharSource .wrap (row ).openStream (), StandardCharsets .UTF_8 ));
172+ mariadbStatement .executeUpdate (loadDataSql );
173+ }
174+ }
175+
181176 private String getLoadDataSql (String tableName , String allColumns , ResultSetMetaData rsmd ) throws SQLException {
182177 StringBuilder loadDataSql = new StringBuilder ();
183178 loadDataSql .append ("LOAD DATA LOCAL INFILE 'dummy' INTO TABLE " );
0 commit comments