CQU大数据计算 第四次实验 SparkSQL结构化数据分析与处理/Spark 结构化流处理

1 SparkSQL结构化数据分析与处理

1.1 SparkSQL简单使用

1.1.1 第1关:SparkSQL初识

sparksql/src/main/java/com/educoder/bigData/sparksql/Test1.java

package com.educoder.bigData.sparksql;

import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;


public class Test1 {
	

	public static void main(String[] args) throws AnalysisException {
		/********* Begin *********/
		SparkSession  spark  =  SparkSession 
                  .builder()
                  .appName("Java Spark SQL基本示例")
                  .master("local")
                  .config("spark.some.config.option" , "some-value")
                  .getOrCreate();
    //打印spark版本号
    System.out.println(spark.version());
          
		
		/********* End *********/
	}
}

1.1.2 第2关:Dataset创建及使用

sparksql/src/main/java/com/educoder/bigData/sparksql/Test2.java

package com.educoder.bigData.sparksql;

import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.col;

public class Test2 {


	public static void main(String[] args) throws AnalysisException {
		
		SparkSession  spark  =  SparkSession 
				  .builder()
				  .appName("test1")
				  .master("local")
				  .config("spark.some.config.option" , "some-value")
				  .getOrCreate();
		/********* Begin *********/
		// 读取people.json文件创建Dataset
        Dataset<Row> df = spark.read().json("people.json");
        
        // 过滤age为23的数据
        Dataset<Row> filteredDF = df.filter(col("age").notEqual(23));
        
        // 显示前20行结果
        filteredDF.show();
		
		
		
		
		
		/********* End *********/
	}

	

}

1.1.3 第3关:Dataset自定义函数

sparksql/src/main/java/com/educoder/bigData/sparksql/MyAverage.java

package com.educoder.bigData.sparksql;

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class MyAverage extends UserDefinedAggregateFunction {
    
    private static final long serialVersionUID = 1L;
    
    private StructType inputSchema;
    private StructType bufferSchema;

    public MyAverage() {
        /********* Begin *********/
        // 定义输入参数的数据类型
        List<StructField> inputFields = new ArrayList<>();
        inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.LongType, true));
        inputSchema = DataTypes.createStructType(inputFields);
        
        // 定义聚合缓冲区的数据类型
        List<StructField> bufferFields = new ArrayList<>();
        bufferFields.add(DataTypes.createStructField("sum", DataTypes.LongType, true));
        bufferFields.add(DataTypes.createStructField("count", DataTypes.LongType, true));
        bufferSchema = DataTypes.createStructType(bufferFields);
        /********* End *********/
    }
    
    @Override
    public StructType bufferSchema() {
        /********* Begin *********/
        return bufferSchema;
        /********* End *********/
    }

    @Override
    public DataType dataType() {
        /********* Begin *********/
        return DataTypes.DoubleType;
        /********* End *********/
    }

    @Override
    public boolean deterministic() {
        return true;
    }

    @Override
    public Object evaluate(Row buffer) {
        /********* Begin *********/
        if (buffer.getLong(1) == 0) {
            return null;
        }
        return ((double) buffer.getLong(0)) / buffer.getLong(1);
        /********* End *********/
    }

    @Override
    public void initialize(MutableAggregationBuffer buffer) {
        /********* Begin *********/
        buffer.update(0, 0L);  // 初始化sum为0
        buffer.update(1, 0L);  // 初始化count为0
        /********* End *********/    
    }

    @Override
    public StructType inputSchema() {
        /********* Begin *********/
        return inputSchema;
        /********* End *********/
    }

    @Override
    public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
        /********* Begin *********/
        long mergedSum = buffer1.getLong(0) + buffer2.getLong(0);
        long mergedCount = buffer1.getLong(1) + buffer2.getLong(1);
        buffer1.update(0, mergedSum);
        buffer1.update(1, mergedCount);
        /********* End *********/
    }

    @Override
    public void update(MutableAggregationBuffer buffer, Row input) {
        /********* Begin *********/
        if (!input.isNullAt(0)) {
            long updatedSum = buffer.getLong(0) + input.getLong(0);
            long updatedCount = buffer.getLong(1) + 1;
            buffer.update(0, updatedSum);
            buffer.update(1, updatedCount);
        }
        /********* End *********/
    }
}

1.2 SparkSQL数据源

1.2.1 第1关:SparkSQL加载和保存

sparksql/src/main/java/com/educoder/bigData/sparksql2/Test1.java

package com.educoder.bigData.sparksql2;

import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

public class Test1 {
    
    public static void main(String[] args) throws AnalysisException {
        SparkSession spark = SparkSession
                  .builder()
                  .appName("test1")
                  .master("local")
                  .getOrCreate();
        
        /********* Begin *********/
        // 1. 加载people.json文件并以覆盖方式保存到people路径
        Dataset<Row> peopleDF = spark.read().json("people.json");
        peopleDF.write().mode(SaveMode.Overwrite).save("people");
        
        // 2. 加载people1.json文件并以附加方式保存到people路径
        Dataset<Row> people1DF = spark.read().json("people1.json");
        people1DF.write().mode(SaveMode.Append).save("people");
        
        // 3. 读取people路径中的数据并显示前20行
        Dataset<Row> combinedDF = spark.read().load("people");
        combinedDF.show();
        /********* End *********/
    }
}

1.2.2 第2关:Parquet文件介绍

sparksql/src/main/java/com/educoder/bigData/sparksql2/Test2.java

package com.educoder.bigData.sparksql2;

import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SaveMode;

public class Test2 {

    public static void main(String[] args) throws AnalysisException {
        SparkSession spark = SparkSession
                  .builder()
                  .appName("test1")
                  .master("local")
                  .getOrCreate();
        
        /********* Begin *********/
        // 1. 读取people.json并保存到id=1分区
        Dataset<Row> peopleDF = spark.read().json("people.json");
        peopleDF.withColumn("id", org.apache.spark.sql.functions.lit(1))
               .write()
               .mode(SaveMode.Overwrite)
               .partitionBy("id")
               .save("people");
        
        // 2. 读取people1.json并保存到id=2分区
        Dataset<Row> people1DF = spark.read().json("people1.json");
        people1DF.withColumn("id", org.apache.spark.sql.functions.lit(2))
                .write()
                .mode(SaveMode.Append)
                .partitionBy("id")
                .save("people");
        
        // 3. 读取合并后的分区数据并显示
        Dataset<Row> mergedDF = spark.read().load("people");
        mergedDF.show();
        /********* End *********/
    }
}

1.2.3 第3关:json文件介绍

sparksql/src/main/java/com/educoder/bigData/sparksql2/Test3.java

package com.educoder.bigData.sparksql2;

import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class Test3 {
    
    public static void main(String[] args) throws AnalysisException {
        SparkSession spark = SparkSession
                  .builder()
                  .appName("test1")
                  .master("local")
                  .getOrCreate();
        
        /********* Begin *********/
        // 1. 读取people.json文件
        Dataset<Row> peopleDF = spark.read().json("people.json");
        
        // 2. 读取people1.json文件
        Dataset<Row> people1DF = spark.read().json("people1.json");
        
        // 3. 合并两个Dataset
        Dataset<Row> combinedDF = peopleDF.union(people1DF);
        
        // 4. 创建临时视图以便使用SQL查询
        combinedDF.createOrReplaceTempView("people");
        
        // 5. 计算平均薪水并显示结果
        Dataset<Row> avgSalaryDF = spark.sql("SELECT AVG(CAST(salary AS DOUBLE)) FROM people");
        avgSalaryDF.show();
        /********* End *********/
    }
}

1.2.4 第4关:JDBC读取数据源

sparksql/src/main/java/com/educoder/bigData/sparksql2/Test4.java

package com.educoder.bigData.sparksql2;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.util.Properties;

public class Test4 {

    public static void case4(SparkSession spark) {
        
        /********* Begin *********/
        // 1. 读取people.json文件
        Dataset<Row> peopleDF = spark.read().json("people.json");
        
        // 2. 读取people1.json文件
        Dataset<Row> people1DF = spark.read().json("people1.json");
        
        // 3. 合并两个Dataset
        Dataset<Row> combinedDF = peopleDF.union(people1DF);
        
        // 4. 配置MySQL连接属性
        Properties connectionProperties = new Properties();
        connectionProperties.put("user", "root");
        connectionProperties.put("password", "123123");
        connectionProperties.put("driver", "com.mysql.jdbc.Driver");
        
        // 5. 将数据保存到MySQL的people表中
        combinedDF.write()
            .mode(SaveMode.Overwrite)
            .jdbc("jdbc:mysql://localhost:3306/test", "people", connectionProperties);
        
        // 6. 从MySQL的people表中读取数据
        Dataset<Row> jdbcDF = spark.read()
            .jdbc("jdbc:mysql://localhost:3306/test", "people", connectionProperties);
        
        // 7. 显示前20行数据
        jdbcDF.show();
        /********* End *********/
    }
}

2 Spark 结构化流处理

2.1 SparkStreaming

2.1.1 第1关:QueueStream

App/src/main/java/net/educoder/Step1.java

package net.educoder;

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.api.java.JavaRDD;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.LinkedBlockingQueue;

public class Step1 {
    private static SparkConf conf;
    static {
        conf = new SparkConf().setMaster("local[*]").setAppName("Step1");
        conf.set("spark.streaming.stopGracefullyOnShutdown", "true");
    }
    public static void main(String[] args) throws InterruptedException {
        /********** Begin ***********/
        //1.初始化JavaStreamingContext并设置处理批次的时间间隔,Durations.seconds(1)  --> 1秒一个批次
        JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(1));
        
        //2.获取QueueStream流
        LinkedBlockingQueue<JavaRDD<String>> queue = QueueStream.queueStream(ssc);
        JavaDStream<String> lines = ssc.queueStream(queue);
        
        //3.获取队列流中的数据,进行清洗、转换(按照上面的需求)
        JavaDStream<String> processedData = lines.map(line -> {
            // 按逗号分割,但需要注意单引号内的内容
            String[] parts = line.split(",");
            
            // 解析各字段
            String ip = parts[0];
            long timestamp = Long.parseLong(parts[1]);
            
            // 处理URL部分(去除单引号)
            String urlInfo = parts[2].replace("'", "");
            
            // 目标URL和状态码
            String targetUrl = parts[3];
            String statusCode = parts[4];
            
            // 1. 将时间戳转换成规定格式
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            String formattedDate = sdf.format(new Date(timestamp));
            
            // 2. 提取起始URL(按空格分割)
            String startUrl = urlInfo.split(" ")[1];
            
            // 3. 拼接结果数据
            return "Ip:" + ip + ",visitTime:" + formattedDate + 
                   ",startUrl:" + startUrl + ",targetUrl:" + targetUrl + 
                   ",statusCode:" + statusCode;
        });

        //4.判断rdd是否为空,如果为空,调用ssc.stop(false, false)与sys.exit(0)两个方法,反之将结果数据存储到mysql数据库中
        processedData.foreachRDD(rdd -> {
            if (rdd.isEmpty()) {
                ssc.stop(false, false);
                System.exit(0);
            } else {
                JdbcTools.saveData(rdd.toLocalIterator());
            }
        });

        //5.启动SparkStreaming
        ssc.start();

        //6.等待计算结束
        ssc.awaitTermination();
        /********** End **********/
    }
}

2.1.2 第2关:File Streams

Spark/src/main/java/SparkStreaming.java

package com.educoder;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.io.Serializable;
import java.sql.*;
import java.util.Arrays;
import java.util.Iterator;

public class SparkStreaming  {
    
    public static void main(String[] args) throws Exception {
        SparkConf conf=new SparkConf().setAppName("edu").setMaster("local");
        /********** Begin **********/
        //1.初始化StreamingContext,设置时间间隔为1s
        JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(1));
        
        //2.设置文件流,监控目录/root/step11_fils
        JavaDStream<String> lines = ssc.textFileStream("/root/step11_fils");
        
        /* *数据格式如下:hadoop hadoop spark spark
           *切割符为空格
           *需求:
           *累加各个批次单词出现的次数
           *将结果导入Mysql
           *判断MySQL表中是否存在即将要插入的单词,不存在就直接插入,存在则把先前出现的次数与本次出现的次数相加后插入
           *库名用educoder,表名用step,单词字段名用word,出现次数字段用count
        */
        
        //3.对数据进行清洗转换
        JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
        JavaPairDStream<String, Integer> wordcount = words.mapToPair(word -> new Tuple2<>(word, 1))
            .reduceByKey((a, b) -> a + b);
        
        //4.将结果导入MySQL
        wordcount.foreachRDD(rdd->{
            rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Integer>>>() {
                @Override
                public void call(Iterator<Tuple2<String, Integer>> iterator) throws Exception {
                    Connection connection = myconn();
                    Statement stmt = connection.createStatement();
                    
                    // Create table if not exists
                    stmt.execute("CREATE TABLE IF NOT EXISTS educoder.step (word VARCHAR(255) PRIMARY KEY, count INT)");
                    
                    while (iterator.hasNext()) {
                        Tuple2<String, Integer> record = iterator.next();
                        String word = record._1();
                        int count = record._2();
                        
                        // Check if word exists
                        ResultSet rs = stmt.executeQuery("SELECT count FROM educoder.step WHERE word = '" + word + "'");
                        if (rs.next()) {
                            // Update existing record
                            int existingCount = rs.getInt("count");
                            stmt.executeUpdate("UPDATE educoder.step SET count = " + (existingCount + count) + 
                                              " WHERE word = '" + word + "'");
                        } else {
                            // Insert new record
                            stmt.executeUpdate("INSERT INTO educoder.step VALUES('" + word + "', " + count + ")");
                        }
                        rs.close();
                    }
                    
                    stmt.close();
                    connection.close();
                }
            });
        });
        
        //5.启动SparkStreaming
        ssc.start();
        ssc.awaitTermination();
        /********** End **********/
        Thread.sleep(15000);
        ssc.stop();
        ssc.awaitTermination();
    }
    
    /**
      *获取mysql连接
      *@return
      */
    public static Connection myconn()throws SQLException,Exception{
        Class.forName("com.mysql.jdbc.Driver");
        Connection conn= DriverManager.getConnection("jdbc:mysql://localhost:3306/educoder","root","123123");
        return conn;
    }
}

2.1.3 第3关:socketTextStream

SparkStreaming/src/main/java/JSocketSpark.java

package com;

import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.sql.Connection;
import java.sql.DriverManager;

public class JSocketSpark {
    public static void main(String[] args) throws InterruptedException{
        SparkConf conf = new SparkConf().setAppName("socketSparkStreaming").setMaster("local[*]");
        conf.set("spark.streaming.stopGracefullyOnShutdown", "true");
        JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(2));
        /**********begin**********/
        //1.连接socket流 主机名:localhost 端口:5566
        JavaReceiverInputDStream<String> lines = ssc.socketTextStream("localhost", 5566);   

        //2.切分压平
        JavaDStream<String> words = lines.flatMap(line -> {
            String cleanedLine = line.toLowerCase().replaceAll("[^a-zA-Z\\s]", "");
            return Arrays.stream(cleanedLine.split("\\s+"))
                        .filter(word -> !word.isEmpty())
                        .iterator();
        });


        //3.组装
        JavaPairDStream<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));

        //4.设置检查点
        ssc.checkpoint("/root/check");
        //5.每个时间窗口内得到的统计值都累加到上个时间窗口得到的值,将返回结果命名为reduced
        Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunc = 
            (values, state) -> {
                int currentSum = values.stream().mapToInt(Integer::intValue).sum();
                int previousSum = state.orElse(0);
                return Optional.of(currentSum + previousSum);
            };
        JavaPairDStream<String, Integer> reduced = pairs.updateStateByKey(updateFunc);


        //6.将结果写入MySQL
        // 语法:如果存在这个单词就更新它所对应的次数
        //      如果不存在将其添加
        reduced.foreachRDD(rdd -> {
            rdd.foreachPartition(partition -> {
                Connection conn = null;
                try {
                    conn = myconn();
                    conn.setAutoCommit(false); // Use transactions
                    String sql = "INSERT INTO wordcount (word, wordcount) VALUES (?, ?) " +
                                "ON DUPLICATE KEY UPDATE wordcount = VALUES(wordcount)";
                    try (java.sql.PreparedStatement pstmt = conn.prepareStatement(sql)) {
                        while (partition.hasNext()) {
                            Tuple2<String, Integer> tuple = partition.next();
                            pstmt.setString(1, tuple._1());
                            pstmt.setInt(2, tuple._2());
                            pstmt.addBatch();
                        }
                        pstmt.executeBatch();
                        conn.commit();
                    }
                } catch (Exception e) {
                    if (conn != null) {
                        try {
                            conn.rollback();
                        } catch (Exception re) {
                            // Suppress rollback errors
                        }
                    }
                } finally {
                    if (conn != null) {
                        try {
                            conn.close();
                        } catch (Exception e) {
                            // Suppress close errors
                        }
                    }
                }
            });
        });
        
        /********** End **********/
        ssc.start();
        ssc.awaitTermination();
    }

    public static Connection myconn()throws Exception{
        Class.forName("com.mysql.jdbc.Driver");
        Connection conn= DriverManager.getConnection("jdbc:mysql://localhost:3306/edu","root","123123");
        return conn;
    }
}

2.1.4 第4关:KafkaStreaming

App/src/main/java/net/educoder/Step2.java

package net.educoder;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import java.text.SimpleDateFormat;
import java.util.*;

public class Step2 {
    private static SparkConf conf;
    static {
        conf = new SparkConf().setMaster("local[*]").setAppName("Step2");
        conf.set("spark.streaming.stopGracefullyOnShutdown", "true");
    }

    public static void main(String[] args) throws InterruptedException {
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "127.0.0.1:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "sparkStreaming");
        kafkaParams.put("enable.auto.commit", "false");
        TopicPartition topicPartition = new TopicPartition("test", 0);
        List<TopicPartition> topicPartitions = Arrays.asList(topicPartition);
        HashMap<TopicPartition, Long> offsets = new HashMap<>();
        offsets.put(topicPartition, 0l);
        
        /********** Begin **********/

        // 初始化JavaStreamingContext
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

        // 创建Kafka Direct Stream
        JavaInputDStream<ConsumerRecord<String, String>> directStream = KafkaUtils.createDirectStream(
            jssc,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.<String, String>Assign(topicPartitions, kafkaParams, offsets)
        );

        // 数据转换
        JavaDStream<String> processed = directStream.map(record -> {
            String line = record.value();
            String[] parts = line.split(",");
            String ip = parts[0].trim();
            long timestamp = Long.parseLong(parts[1].trim());
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            String visitTime = sdf.format(new Date(timestamp));
            String thirdField = parts[2].replaceAll("'", "").trim();
            String[] urlParts = thirdField.split("\\s+");
            String startUrl = urlParts.length >= 2 ? urlParts[1] : "";
            String targetUrl = parts[3].trim();
            String statusCode = parts[4].trim();
            return String.format("Ip:%s,visitTime:%s,startUrl:%s,targetUrl:%s,statusCode:%s",
                ip, visitTime, startUrl, targetUrl, statusCode);
        });

        // 处理RDD
        processed.foreachRDD(rdd -> {
            if (rdd.isEmpty()) {
                jssc.stop(false, false);
                System.exit(0);
            } else {
                rdd.foreachPartition(partition -> {
                    JdbcTools.saveData2(partition);
                });
            }
        });

        // 启动并等待
        jssc.start();
        jssc.awaitTermination();

        /********** End **********/
    }
}

难点解决方案

在2.1.3 2.1.4这两个问题花了大量时间

OJ平台难免遇到奇怪的问题,要想顺利完整理解并实现要求,需要看明白程序执行方式

例如 SparkStreaming

1.sh

#!/bin/bash
cd SparkStreaming
mysql -uroot -p123123 -e'DROP database IF EXISTS `edu`'
mysql -uroot -p123123 < wordcount.sql
rm -rf /root/files
rm -rf /root/SparkStreaming-1.0-SNAPSHOT.jar
rm -rf /root/socket.jar
rm -rf /root/check
mkdir /root/check
mvn clean package -o >/dev/null 2>&1 
cp target/SparkStreaming-1.0-SNAPSHOT.jar /root
cp ./socket.jar /root
mkdir /root/files
cp files/wordcount.txt /root/files/
nohup java -jar /root/socket.jar >/dev/null 2>&1 &
sleep 3s
nohup /opt/spark/dist/bin/spark-submit --class com.JSocketSpark /root/SparkStreaming-1.0-SNAPSHOT.jar >/dev/null 2>&1 &
sleep 15s
mysql -uroot -p123123 -e'select * from edu.wordcount order by wordcount desc' | sed -n '2,6p'
ps -ef | grep SparkSubmit | grep -v grep | awk '{print $2}' | xargs kill -9 >/dev/null 2>&1 

可以通过脚本推测文件和参数 基于此进行开发

本技术内容仅供学习和交流使用,如有疑问请联系qq2014160588并注明来意。请确保在使用过程中遵守相关法律法规。任何因使用本技术内容而导致的直接或间接损失,作者概不负责。用户需自行承担因使用本技术内容而产生的所有风险和责任。请勿将本技术内容用于任何非法用途。
上一篇
下一篇