DataFileReader.java

/*
 * Copyright 2013 University of Glasgow.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package broadwick.data.readers;

import broadwick.BroadwickConstants;
import broadwick.BroadwickException;
import broadwick.io.FileInput;
import com.google.common.base.Throwables;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.StopWatch;

/**
 * Base class for all data file readers, containing useful common functionality..
 */
@Slf4j
public abstract class DataFileReader {

    /**
     * Convert a collection of elements read from the data file (the column names in the internal database) into a CSV
     * string.
     * @param data the collection of column names.
     * @return a csv string of the input data.
     */
    protected final String asCsv(final Collection<String> data) {
        final StringBuilder sb = new StringBuilder();
        for (String name : data) {
            if (name != null) {
                sb.append(name).append(",");
            }
        }
        // remove the last character.
        final String csv = sb.toString();
        return csv.substring(0, csv.length() - 1);
    }

    /**
     * Convert a collection of elements read from the data file (the column names in the internal database) into a CSV
     * string of question marks.
     * @param data the collection of column names.
     * @return as csv string of the question marks containing the same number of data elements.
     */
    protected final String asQuestionCsv(final Collection<String> data) {
        final StringBuilder sb = new StringBuilder();
        for (String name : data) {
            if (name != null) {
                sb.append("?,");
            }
        }
        // remove the last character.
        final String csv = sb.toString();
        return csv.substring(0, csv.length() - 1);
    }

    /**
     * Create or append to a string command in the form "CREATE TABLE IF NOT EXISTS [table] ([col1], [col2], ...,
     * [coln]) values (?,?,...,?)" from the details read from the configuration file.
     * <pre>
     * updateCreateTableCommand(ID, 1, " VARCHAR(128), ", new HashMap<String, Integer>(), createTableCommand,
     * TABLE_NAME, SECTION_NAME, errors);
     * </pre>
     * @param columnName         the name of the column in the table
     * @param columnIndex        the location in the csv file where it appears.
     * @param columnType         the type of column e.g. "INT", "VARCHAR(128) NOT NULL" etc.
     * @param insertedColDetails a map containing the name of the column and the location in the csv file where it
     *                           appears i.e. columnName, columnIndex.
     * @param createTableCommand a StringBuilder object that we append to to create the create table command.
     * @param tableName          the name of the table that is being created.
     * @param sectionName        the name of the section in the configuration file that configures the data.
     * @param errors             a stringbuilder object to which we will append error messages.
     * @return the name of the column added.
     */
    protected final String updateCreateTableCommand(final String columnName, final Integer columnIndex, final String columnType,
                                                    final Map<String, Integer> insertedColDetails,
                                                    final StringBuilder createTableCommand, final String tableName,
                                                    final String sectionName, final StringBuilder errors) {
        if (createTableCommand.length() == 0) {
            createTableCommand.append(String.format("CREATE TABLE %s (", tableName));
        }

        String addedColumnName = null;
        if (columnIndex != null) {
            if (columnIndex != 0) {
                createTableCommand.append(columnName).append(columnType);
                insertedColDetails.put(columnName, columnIndex);
                addedColumnName = columnName;
            } else {
                errors.append("No ").append(columnName).append(" column set in ").append(sectionName).append(" section\n");
            }
        }

        return addedColumnName;
    }

    /**
     * Execute a command to create a table.
     * @param tableName          the name of the table to be created.
     * @param createTableCommand the command to create the table.
     * @param connection         the database connection to use to create the table.
     * @throws SQLException if a SQL error has been encountered.
     */
    protected final void createTable(final String tableName, final String createTableCommand, final Connection connection)
            throws SQLException {

        final DatabaseMetaData dbm = connection.getMetaData();

        // First check if the table already exists, some databases do not support
        // CREATE TABLE ??? IF NOT EXISTS
        // so we have to look at the database schema
        try (ResultSet resultSet = dbm.getTables(null, null, "%", null)) {
            boolean tableExists = false;
            while (resultSet.next()) {
                if (tableName.equalsIgnoreCase(resultSet.getString("TABLE_NAME"))) {
                    log.debug("Table {} already exists, ignoring", tableName);
                    tableExists = true;
                }
            }

            if (!tableExists) {
                try (Statement stmt = connection.createStatement()) {
                    final String[] commands = createTableCommand.split(";");
                    for (int i = 0; i < commands.length; i++) {
                        log.trace("Creating table {}", commands[i]);
                        stmt.execute(commands[i]);
                    }
                } catch (SQLException sqle) {
                    connection.rollback();
                    log.error("Error while creating the table '{}'. {}", createTableCommand,
                              Throwables.getStackTraceAsString(sqle));
                    throw sqle;
                }
            }

//        } catch (Exception e) {
//            log.error("Could not create database {}", Throwables.getStackTraceAsString(e));
        }

        connection.commit();
    }

    /**
     * Perform the insertion into the database.
     * @param connection      the connection to the database.
     * @param tableName       the name of the table into which the data will be put.
     * @param insertString    the command used to insert a row into the database.
     * @param dataFile        the [CSV] file that contained the data.
     * @param dateFormat      the format of the date in the file.
     * @param insertedColInfo a map of column name to column in the data file.
     * @param dateFields      a collection of columns in the csv file that contains date fields.
     * @return the number of rows inserted.
     */
    protected final int insert(final Connection connection, final String tableName,
                               final String insertString,
                               final String dataFile,
                               final String dateFormat,
                               final Map<String, Integer> insertedColInfo,
                               final Collection<Integer> dateFields) {

        int inserted = 0;
        try {
            // Now do the insertion.
            log.trace("Inserting into {} via {}", tableName, insertString);
            PreparedStatement pstmt = connection.prepareStatement(insertString);
            log.trace("Prepared statement = {}", pstmt.toString());

            try (FileInput instance = new FileInput(dataFile, ",")) {
                final StopWatch sw = new StopWatch();
                sw.start();
                List<String> data = instance.readLine();
                while (data != null && !data.isEmpty()) {
                    int parameterIndex = 1;
                    for (Map.Entry<String, Integer> entry : insertedColInfo.entrySet()) {
                        if (entry.getValue() == -1) {
                            pstmt.setObject(parameterIndex, null);
                        } else {
                            final String value = data.get(entry.getValue() - 1);
                            if (dateFields.contains(entry.getValue())) {
                                int dateField = Integer.MAX_VALUE;
                                if (value != null && !value.isEmpty()) {
                                    dateField = BroadwickConstants.getDate(value, dateFormat);
                                }
                                pstmt.setObject(parameterIndex, dateField);
                            } else {
                                pstmt.setObject(parameterIndex, value);
                            }
                        }
                        parameterIndex++;
                    }
                    pstmt.addBatch();
                    try {
                        pstmt.executeUpdate();
                        inserted++;
                    } catch (SQLException ex) {
                        if ("23505".equals(ex.getSQLState())) {
                            //Ignore found duplicate from database view
                            continue;
                        } else {
                            log.warn("Duplicate data found for {}: continuing despite errors: {}", data.get(0), ex.getLocalizedMessage());
                            log.trace("{}", Throwables.getStackTraceAsString(ex));
                            throw ex;
                        }
                    }
                    if (inserted % 250000 == 0) {
                        log.trace("Inserted {} rows in {}", inserted, sw.toString());
                        connection.commit();
                        pstmt.close();
                        pstmt = connection.prepareStatement(insertString);
                    }

                    data = instance.readLine();
                }
                connection.commit();

            } catch (IOException ex) {
                log.error("IO error : {}", ex.getLocalizedMessage());
                log.trace("{}", Throwables.getStackTraceAsString(ex));
            } catch (SQLException ex) {
                log.error("SQL Error : {}", ex.getLocalizedMessage());
                log.trace("{}", Throwables.getStackTraceAsString(ex));
                throw ex;
            } finally {
                pstmt.close();
            }
        } catch (SQLException ex) {
            log.error("{}", ex.getLocalizedMessage());
            log.trace("{}", Throwables.getStackTraceAsString(ex));
            throw new BroadwickException(ex);
        }

        return inserted;
    }

    /**
     * Insert the data from the input file into the database. The data structure has been read and the database set up
     * already so this method simply reads the file and extracts the relevant information, storing it in the database.
     * @return the number of rows read
     */
    public abstract int insert();
}