set -e
export INPUT_PATH=${1:-'/data/dlrm/criteo'}
export OUTPUT_PATH=${2:-'/data/dlrm/spark/output'}
export FREQUENCY_LIMIT=${3:-'15'}
export SPARK_LOCAL_DIRS='/data/dlrm/spark/tmp'
export TOTAL_CORES=80
export NUM_EXECUTORS=8
export NUM_EXECUTOR_CORES=$((${TOTAL_CORES}/${NUM_EXECUTORS}))
export TOTAL_MEMORY=800
export DRIVER_MEMORY=32
export EXECUTOR_MEMORY=$(((${TOTAL_MEMORY}-${DRIVER_MEMORY})/${NUM_EXECUTORS}))
OPTS="--frequency_limit $FREQUENCY_LIMIT"
export SPARK_HOME=/opt/spark
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export PATH=$SPARK_HOME/bin:$SPARK_HOME/sbin:$PATH
export MASTER=spark://$HOSTNAME:7077
echo "Starting spark standalone"
start-master.sh
start-slave.sh $MASTER
echo "Generating the dictionary..."
spark-submit --master $MASTER \
--driver-memory "${DRIVER_MEMORY}G" \
--executor-cores $NUM_EXECUTOR_CORES \
--executor-memory "${EXECUTOR_MEMORY}G" \
--conf spark.cores.max=$TOTAL_CORES \
--conf spark.task.cpus=1 \
--conf spark.sql.files.maxPartitionBytes=1073741824 \
--conf spark.sql.shuffle.partitions=600 \
--conf spark.driver.maxResultSize=2G \
--conf spark.locality.wait=0s \
--conf spark.network.timeout=1800s \
spark_data_utils.py --mode generate_models \
$OPTS \
--input_folder $INPUT_PATH \
--days 0-23 \
--model_folder $OUTPUT_PATH/models \
--write_mode overwrite --low_mem 2>&1 | tee submit_dict_log.txt
echo "Transforming the train data from day_0 to day_22..."
spark-submit --master $MASTER \
--driver-memory "${DRIVER_MEMORY}G" \
--executor-cores $NUM_EXECUTOR_CORES \
--executor-memory "${EXECUTOR_MEMORY}G" \
--conf spark.cores.max=$TOTAL_CORES \
--conf spark.task.cpus=1 \
--conf spark.sql.files.maxPartitionBytes=1073741824 \
--conf spark.sql.shuffle.partitions=600 \
--conf spark.driver.maxResultSize=2G \
--conf spark.locality.wait=0s \
--conf spark.network.timeout=1800s \
spark_data_utils.py --mode transform \
--input_folder $INPUT_PATH \
--days 0-22 \
--output_folder $OUTPUT_PATH/train \
--model_size_file $OUTPUT_PATH/model_size.json \
--model_folder $OUTPUT_PATH/models \
--write_mode overwrite --low_mem 2>&1 | tee submit_train_log.txt
echo "Splitting the last day into 2 parts of test and validation..."
last_day=$INPUT_PATH/day_23
temp_test=$OUTPUT_PATH/temp/test
temp_validation=$OUTPUT_PATH/temp/validation
mkdir -p $temp_test $temp_validation
lines=`wc -l $last_day | awk '{print $1}'`
former=$((lines / 2))
latter=$((lines - former))
head -n $former $last_day > $temp_test/day_23
tail -n $latter $last_day > $temp_validation/day_23
echo "Transforming the test data in day_23..."
spark-submit --master $MASTER \
--driver-memory "${DRIVER_MEMORY}G" \
--executor-cores $NUM_EXECUTOR_CORES \
--executor-memory "${EXECUTOR_MEMORY}G" \
--conf spark.cores.max=$TOTAL_CORES \
--conf spark.task.cpus=1 \
--conf spark.sql.files.maxPartitionBytes=1073741824 \
--conf spark.sql.shuffle.partitions=30 \
--conf spark.driver.maxResultSize=2G \
--conf spark.locality.wait=0s \
--conf spark.network.timeout=1800s \
spark_data_utils.py --mode transform \
--input_folder $temp_test \
--days 23-23 \
--output_folder $OUTPUT_PATH/test \
--output_ordering input \
--model_folder $OUTPUT_PATH/models \
--write_mode overwrite --low_mem 2>&1 | tee submit_test_log.txt
echo "Transforming the validation data in day_23..."
spark-submit --master $MASTER \
--driver-memory "${DRIVER_MEMORY}G" \
--executor-cores $NUM_EXECUTOR_CORES \
--executor-memory "${EXECUTOR_MEMORY}G" \
--conf spark.cores.max=$TOTAL_CORES \
--conf spark.task.cpus=1 \
--conf spark.sql.files.maxPartitionBytes=1073741824 \
--conf spark.sql.shuffle.partitions=30 \
--conf spark.driver.maxResultSize=2G \
--conf spark.locality.wait=0s \
--conf spark.network.timeout=1800s \
spark_data_utils.py --mode transform \
--input_folder $temp_validation \
--days 23-23 \
--output_folder $OUTPUT_PATH/validation \
--output_ordering input \
--model_folder $OUTPUT_PATH/models \
--write_mode overwrite --low_mem 2>&1 | tee submit_validation_log.txt
rm -r $temp_test $temp_validation