莺时

东边日出西边雨,道是无晴却有晴

Open Source, Open Mind,
Open Sight, Open Future!
  menu
18 文章
3692 浏览
0 当前访客
ღゝ◡╹)ノ❤️

Spark Streaming 实时统计数据(累加器的应用)

Spark Streaming 实时统计数据(累加器的应用)

如果代码缺失导致无法运行,请留言标识,我会补全的❤️

场景描述

从kafka中取实时数据,对数据进行清洗过滤,然后和当天的历史数据进行合并去重,对合并后的数据集进行汇总。将汇总结果写入HBase,当时间到第二天的时候清除前一天历史数据,重新统计。

实现逻辑

  1. 采用Spark Streaming读取Kafka中的实时数据流,生成DStream
  2. 过滤其中的满足要求的数据,生成DStream[k,v] (注:k为数据唯一键, v为详细数据信息)
  3. 采用Spark Streaming中DStream[k,v]的mapWithState方法生成去重后的数据集
  4. 通过调用StreamingContext中的awaitTerminationOrTimeout(time) 方法设置当前StreamingContext的终止时间实现在24时终止所有上述DStream计算。
  5. 调用StreamingContext中的stop方法,终止StreamingContext。调用stop方法默认会终止SparkContext,设置stop(stopSparkContext:Boolean = false,stopGracefully:Boolean = true)参数,可以实现不终止SparkContext,同时能够保持StreamingContext已经接受的Batch能够处理完成后再终止StreamingContext

JAVA代码

RealStatStreaming.java

  1import kafka.utils.ZKGroupTopicDirs;
  2import org.apache.hadoop.hbase.client.Put;
  3import org.apache.hadoop.hbase.util.Bytes;
  4import org.apache.kafka.clients.consumer.ConsumerConfig;
  5import org.apache.kafka.clients.consumer.ConsumerRecord;
  6import org.apache.kafka.common.TopicPartition;
  7import org.apache.kafka.common.serialization.StringDeserializer;
  8import org.apache.spark.SparkConf;
  9import org.apache.spark.api.java.JavaRDD;
 10import org.apache.spark.api.java.JavaSparkContext;
 11import org.apache.spark.api.java.Optional;
 12import org.apache.spark.api.java.function.Function;
 13import org.apache.spark.api.java.function.Function0;
 14import org.apache.spark.sql.Dataset;
 15import org.apache.spark.sql.Encoders;
 16import org.apache.spark.sql.Row;
 17import org.apache.spark.sql.SparkSession;
 18import org.apache.spark.streaming.Durations;
 19import org.apache.spark.streaming.State;
 20import org.apache.spark.streaming.StateSpec;
 21import org.apache.spark.streaming.api.java.JavaDStream;
 22import org.apache.spark.streaming.api.java.JavaInputDStream;
 23import org.apache.spark.streaming.api.java.JavaPairDStream;
 24import org.apache.spark.streaming.api.java.JavaStreamingContext;
 25import org.apache.spark.streaming.kafka010.*;
 26import scala.Tuple2;
 27
 28import java.text.ParseException;
 29import java.text.SimpleDateFormat;
 30import java.util.*;
 31
 32/**
 33 * 用于实时统计数据。
 34 * @date 2019年9月3日17:57:38
 35 * @author Ludengke
 36 */
 37public final class RealStatStreaming {
 38    private static final String OFFSET_DIR = KAFKA_ROOT_PATH.concat(new ZKGroupTopicDirs(REALSTAT_GROUP_ID, CONSUMER_TOPIC_NAME).consumerOffsetDir());
 39
 40    private static SparkSession sparkSession = null;
 41    private static JavaStreamingContext sc = null;
 42
 43    public static void main(String[] args) throws Exception {
 44
 45        SparkConf sparkConf = SparkFactory.getDefaultSparkConf()
 46                .set("spark.sql.shuffle.partitions","24")
 47                .setAppName("RealStatStreaming");
 48
 49        sparkSession = SparkSession.builder()
 50                .config(sparkConf)
 51                .getOrCreate();
 52
 53
 54        // 根据 Spark配置生成 sc对象
 55        /**
 56         * 生成方式有2,如果CheckPoint有内容,则从上次CheckPoint启动
 57         * 如果没有则重新生成。代码重新编译之后,CheckPoint需要删除。
 58         */
 59        sc = JavaStreamingContext.getOrCreate(CHECK_POINT_DIR, (Function0<JavaStreamingContext>) () -> {
 60            sc = new JavaStreamingContext(JavaSparkContext.fromSparkContext(sparkSession.sparkContext()), Durations.seconds(REALSTAT_DURATIONS_SECOND));
 61            sc.checkpoint(CHECK_POINT_DIR);
 62            return sc;
 63        });
 64
 65        // Kafka 相关配置
 66        Map<String, Object> kafkaParams = new HashMap<>(16);
 67        kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_QUORUM);
 68        kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, REALSTAT_GROUP_ID);
 69        kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET);
 70        kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 71        kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 72        kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 73
 74        RealStatStreaming.work(kafkaParams);
 75
 76
 77        sc.start();
 78        sc.awaitTerminationOrTimeout(getNeedRunTime());
 79        sc.stop();
 80    }
 81
 82    /**
 83     * 计算实时统计任务需要运行的时长。
 84     * 明日0时 - 当前的时间
 85     * @return
 86     * @throws ParseException
 87     */
 88    private static long getNeedRunTime() throws ParseException {
 89        SimpleDateFormat sdfDate = new SimpleDateFormat("yyyy-MM-dd HH🇲🇲ss");
 90        Date now = new Date();
 91        String tomorrowMidnight = SystemUtils.getDateAddDays(sdfDate.format(now).substring(0,10),1)+ " 00:00:00";
 92        Date tomorrow = sdfDate.parse(tomorrowMidnight);
 93        return tomorrow.getTime()-now.getTime();
 94    }
 95
 96
 97    private static void work(Map<String, Object> kafkaParams) {
 98
 99        // 根据 Kafka配置以及 sc对象生成 Streaming对象
100        JavaInputDStream<ConsumerRecord<String, String>> stream = RealStatStreaming.getStreaming(sc,kafkaParams);
101
102        // 取出kafka数据中的value
103        JavaDStream<String> lines = stream.map(ConsumerRecord::value);
104        /**
105         * Format将数据转化成<key,bean>的形式,并且过滤
106         * 使用mapWithState将历史数据和当前数据合并去重处理。
107         * 调用stateSnapshots获取全部state的值;不调用的话仅仅包含本轮次的值。
108         * statAndSave统计原始原始单,将结果保存到HBase
109         *
110         * PS: 如果常驻内存数据需要初始值的话,需要StateSpec.function(数据更新维护函数).initialState(初始化RDD)
111         */
112        SimpleDateFormat sdfDate = new SimpleDateFormat("yyyy-MM-dd");
113        String date = sdfDate.format(new Date());
114        RealStatStreaming.statAndSave(RealStatStreaming.FormatData(lines,date).mapWithState(StateSpec.function(RealStatStreaming::mappingFunc)).stateSnapshots(),date);
115
116        // 更新存储在 Zookeeper中的偏移量
117        stream.foreachRDD(rdd -> {
118            OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
119            for (OffsetRange o : offsetRanges) {
120                ZookeeperFactory.getZkUtils().updatePersistentPath(
121                        String.join(ZK_SPLIT, OFFSET_DIR, String.valueOf(o.partition())),
122                        String.valueOf(o.fromOffset()),
123                        ZookeeperFactory.getZkUtils().DefaultAcls()
124                );
125                SystemUtils.info("UPDATE OFFSET WITH [ topic :" + o.topic() + " partition :" + o.partition() + " offset :" + o.fromOffset() + " ~ " + o.untilOffset() + " ]");
126            }
127        });
128    }
129
130    /**
131     * 根据StreamingContext以及Kafka配置生成DStream
132     */
133    private static JavaInputDStream<ConsumerRecord<String, String>> getStreaming(JavaStreamingContext context, Map<String, Object> kafkaParams) {
134        // 获取偏移量存储路径下的偏移量节点
135        if (!ZookeeperFactory.getZkClient().exists(OFFSET_DIR)) {
136            ZookeeperFactory.getZkClient().createPersistent(OFFSET_DIR, true);
137        }
138        List<String> children = ZookeeperFactory.getZkClient().getChildren(OFFSET_DIR);
139
140        if (children != null && !children.isEmpty()) {
141            Map<TopicPartition, Long> fromOffsets = new HashMap<>(children.size());
142            // 可以读取到存在Zookeeper中的偏移量 使用读取到的偏移量创建Streaming来读取Kafka
143            for (String child : children) {
144                long offset = Long.valueOf(ZookeeperFactory.getZkClient().readData(String.join(ZK_SPLIT, OFFSET_DIR, child)));
145                fromOffsets.put(new TopicPartition(CONSUMER_TOPIC_NAME, Integer.valueOf(child)), offset);
146                SystemUtils.info("FOUND OFFSET IN ZOOKEEPER, USE [ partition :" + child + " offset :" + offset + " ]");
147            }
148            SystemUtils.info("CREATE DIRECT STREAMING WITH CUSTOMIZED OFFSET..");
149            return KafkaUtils.createDirectStream(
150                    context,
151                    LocationStrategies.PreferConsistent(),
152                    ConsumerStrategies.<String, String>Assign(new HashSet<>(fromOffsets.keySet()), kafkaParams, fromOffsets)
153            );
154        } else {
155            // Zookeeper内没有存储偏移量 使用默认的偏移量创建Streaming
156            SystemUtils.info("NO OFFSET FOUND, CREATE DIRECT STREAMING WITH DEFAULT OFFSET..");
157            return KafkaUtils.createDirectStream(
158                    context,
159                    LocationStrategies.PreferConsistent(),
160                    ConsumerStrategies.Subscribe(Collections.singleton(CONSUMER_TOPIC_NAME), kafkaParams)
161            );
162        }
163    }
164
165
166    /**
167     * 根据给出的时间过滤所需用统计的数据
168     * @param lines 要处理的数据流
169     * @param date 过滤条件时间字段
170     * @return
171     */
172    private static JavaPairDStream<String,ApiTrade> FormatData (JavaDStream<String> lines,String date) {
173        return lines.mapPartitionsToPair(RealStatStreaming::JsonToPairTradeBean).filter((Function<Tuple2<String, ApiTrade>, Boolean>) line->{
174            if (line._2==null|| line._2.getD()==null || line._2.getE()==null){
175                return false;
176            }
177            if (date.equals(line._2.getD().substring(0,10)) || date.equals(line._2.getE().substring(0,10))){
178                return true;
179            }else {
180                return false;
181            }
182        });
183    }
184
185
186    /**
187     * 对常驻内存的数据快照进行统计,将结果写入HBase
188     * @param lines 常驻内存的数据快照
189     * @param date HBase表的主键
190     */
191    private static void statAndSave(JavaPairDStream<String,ApiTrade> lines,String date) {
192        lines.foreachRDD(tmp->{
193            JavaRDD<ApiTrade> apiTrade = tmp.map((Function<Tuple2<String, ApiTrade>, ApiTrade>) v1 -> {
194                return v1._2;
195            });
196            Dataset<ApiTrade> tradeData = sparkSession.createDataset(apiTrade.rdd(), Encoders.bean(ApiTrade.class));
197            tradeData.createOrReplaceTempView("data");
198            String selectSql = " count(1) count,sum(g) as money";
199            String groupSql = " group by b";
200            Dataset<Row> allStatData = sparkSession.sql(String.join(" ", "select ", "b", selectSql, " from data", groupSql));
201            /**
202             * 创建HBase表,里面包含表存在判断。
203             */
204            HBaseFactory.createTables("daily_total_stat",500);
205
206            /**
207             * 总量统计数据写入HBase
208             */
209            allStatData.rdd().toJavaRDD().foreach(line->{
210                Put put = new Put(Bytes.toBytes(date));
211                put.addColumn(
212                        Bytes.toBytes(COLUMN_FAMILY),
213                        Bytes.toBytes("count"),
214                        Bytes.toBytes(line.getAs("count").toString()));
215				put.addColumn(
216                        Bytes.toBytes(COLUMN_FAMILY),
217                        Bytes.toBytes("money"),
218                        Bytes.toBytes(line.getAs("money").toString()));
219                HBaseFactory.writeToHBase("daily_total_stat",put);
220            });
221        });
222    }
223
224    /**
225     * 将某个分区内的Json数据转化成bean的形式
226     * @param s 某分区数据迭代器
227     * @return
228     */
229    private static Iterator JsonToTradeBean(Iterator<String> s){
230        ArrayList<ApiTrade> tmp = new ArrayList<>();
231        while (s.hasNext()) {
232            ApiTrade apiTrade = SystemUtils.LOWER_CASE_WITH_UNDERSCORES_GSON.fromJson(s.next(), ApiTrade.class);
233            tmp.add(apiTrade);
234        }
235        return tmp.iterator();
236    }
237
238    /**
239     * 将某个分区内的Json数据转化成<key,bean>的形式
240     * @param s 某分区数据迭代器
241     * @return
242     */
243    private static Iterator<Tuple2<String,ApiTrade>> JsonToPairTradeBean(Iterator<String> s){
244        ArrayList<Tuple2<String,ApiTrade>> tmp = new ArrayList<>();
245        while (s.hasNext()) {
246            ApiTrade apiTrade = SystemUtils.LOWER_CASE_WITH_UNDERSCORES_GSON.fromJson(s.next(), ApiTrade.class);
247            tmp.add(new Tuple2<String,ApiTrade>(MD5Utils.encode(apiTrade.getA() + apiTrade.getB() + apiTrade.getC()), apiTrade));
248        }
249        return tmp.iterator();
250    }
251
252
253    /**
254     * 根据key对应的当前数据和历史数据更新合并成新值(key所对应的value值)
255     * @param key 历史数据的key
256     * @param one key对应的当前数据
257     * @param curState key对应的历史数据
258     * @return
259     */
260    private static Tuple2<String,ApiTrade> mappingFunc (String key, Optional<ApiTrade> one, State<ApiTrade> curState){
261        //判断one是否包含值
262        if (one.isPresent()) {
263            //取出当前批次的值
264            ApiTrade oneTrade = one.get();
265            //判断历史值是否存在,不存在直接新增,存在则判断是否更新
266            if (curState.exists()) {
267                //取出历史值,如果历史值为空或者当前值的修改时间大于历史值的修改时间,则更新数据为当前数据
268                ApiTrade curStateTrade = curState.getOption().isEmpty()?null:curState.getOption().get();
269                if(curStateTrade==null || oneTrade.getF().compareTo(curStateTrade.getF())>0){
270                    curState.update(oneTrade);
271                }
272            } else {
273                curState.update(oneTrade);
274            }
275        }
276        return new Tuple2<>(key,curState.get());
277    }
278}
279

SystemUtils .java

 1import com.google.gson.FieldNamingPolicy;
 2import com.google.gson.Gson;
 3import com.google.gson.GsonBuilder;
 4
 5import java.text.ParseException;
 6import java.text.SimpleDateFormat;
 7import java.util.Calendar;
 8
 9/**
10 * @author ludengke
11 * @date 2019/9/11
12 **/
13public class SystemUtils {
14    /**
15     *   日期调整若干天
16     */
17    public static String getDateAddDays(String date ,Integer count) throws ParseException {
18        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd" );
19        Calendar cal = Calendar.getInstance();
20        cal.setTime(format.parse(date));
21        cal.add(Calendar.DATE, count);
22        return format.format(cal.getTime());
23    }
24
25    public static final Gson DEFAULT_GSON = new GsonBuilder().create();
26    public static final Gson LOWER_CASE_WITH_UNDERSCORES_GSON = new GsonBuilder().setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES).create();
27}

SparkFactory.java

 1import org.apache.spark.SparkConf;
 2import org.apache.spark.serializer.KryoSerializer;
 3import org.apache.spark.sql.Row;
 4
 5/**
 6 * @author ludengke
 7 * @date 2019/9/11
 8 **/
 9public class SparkFactory {
10    /**
11     * 所有任务公共配置
12     *
13     * @desc https://spark.apache.org/docs/latest/configuration.html
14     */
15    public static SparkConf getDefaultSparkConf() {
16        return new SparkConf()
17                .set("spark.shuffle.file.buffer", "1024k")
18                .set("spark.reducer.maxSizeInFlight", "128m")
19                .set("spark.shuffle.memoryFraction", "0.3")
20                .set("spark.streaming.stopGracefullyOnShutdown", "true")
21                .set("spark.streaming.kafka.maxRatePerPartition", "300")
22                .set("spark.serializer", KryoSerializer.class.getCanonicalName())
23                .registerKryoClasses(new Class[]{Row.class,Object.class,ApiTrade.class});
24    }
25}
26

GlobalConfig.java

 1/**
 2 * @author ludengke
 3 * @date 2019/9/11
 4 **/
 5public class GlobalConfig {
 6    /**
 7     * Kafka 配置在Zookeeper中的根路径
 8     */
 9    public static final String KAFKA_ROOT_PATH = "/kafka";
10    /**
11     * CheckPoint输出目录,在hdfs上。
12     */
13    public static final String CHECK_POINT_DIR = "/user/hdfs/RealStatStreamingCheckpoint";
14
15    /**
16     * 实时统计 消费者组id
17     */
18    public static final String REALSTAT_GROUP_ID = "realstat";
19
20    /**
21     * 实时统计 streaming 间隔
22     */
23    public static final long REALSTAT_DURATIONS_SECOND = 60L;
24
25    /**
26     * kafka连接
27     */
28    public static final String KAFKA_QUORUM = "kafka1:9092,kafka12:9092,kafka3:9092";
29
30    /**
31     * Kafka偏移量获取方式
32     */
33    public static final String AUTO_OFFSET_RESET = "earliest";
34
35    /**
36     * Zookeeper
37     */
38    public static final String ZK_SPLIT = "/";
39
40    /**
41     * zk连接
42     */
43    public static final String ZOOKEEPER_QUORUM = "kafka1:2181,kafka2:2181,kafka3:2181";
44
45    /**
46     * HBase 列簇名
47     */
48    public static final String COLUMN_FAMILY = "default";
49
50}
51

MD5Utils.java

 1import java.security.MessageDigest;
 2
 3/**
 4 * @author : LiuWeidong
 5 * @date : 2018/12/29.
 6 */
 7
 8public class MD5Utils {
 9
10    private static final String[] HEX_DIG_ITS = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"};
11
12    public static String encode(String origin) {
13        String resultString = null;
14        try {
15            resultString = origin;
16            MessageDigest md = MessageDigest.getInstance("MD5");
17            resultString = byteArrayToHexString(md.digest(resultString.getBytes()));
18        } catch (Exception ignored) {
19        }
20        return resultString;
21    }
22
23
24    private static String byteArrayToHexString(byte[] b) {
25        StringBuilder resultSb = new StringBuilder();
26        for (byte aB : b) {
27            resultSb.append(byteToHexString(aB));
28        }
29        return resultSb.toString();
30    }
31
32    private static String byteToHexString(byte b) {
33        int n = b;
34        if (n < 0) {
35            n += 256;
36        }
37        int d1 = n / 16;
38        int d2 = n % 16;
39        return HEX_DIG_ITS[d1] + HEX_DIG_ITS[d2];
40    }
41}
42

ApiTrade.java

 1
 2import java.io.Serializable;
 3
 4/**
 5 * @author ldk
 6 */
 7public class ApiTrade implements Serializable {
 8
 9    private String a;
10    private String b;
11    private String c;
12    private String d;
13    private String e;
14    private String f;
15    private Integer g;
16
17    public String getA() {
18        return a;
19    }
20
21    public void setA(String a) {
22        this.a = a;
23    }
24
25    public String getB() {
26        return b;
27    }
28
29    public void setB(String b) {
30        this.b = b;
31    }
32
33    public String getC() {
34        return c;
35    }
36
37    public void setC(String c) {
38        this.c = c;
39    }
40
41    public String getD() {
42        return d;
43    }
44
45    public void setD(String d) {
46        this.d = d;
47    }
48
49    public String getE() {
50        return e;
51    }
52
53    public void setE(String e) {
54        this.e = e;
55    }
56
57    public String getF() {
58        return f;
59    }
60
61    public void setF(String f) {
62        this.f = f;
63    }
64
65    public Integer getG() {
66        return g;
67    }
68
69    public void setG(Integer g) {
70        this.g = g;
71    }
72}

ZookeeperFactory.java

 1
 2import com.wangdian.spark.tasks.system.GlobalConfig;
 3import com.wangdian.spark.tasks.utils.serializer.CustomSerializer;
 4import kafka.utils.ZkUtils;
 5import org.I0Itec.zkclient.ZkClient;
 6import org.I0Itec.zkclient.ZkConnection;
 7
 8/**
 9 * @author : ldk
10 * @date : 2019/1/24.
11 */
12public class ZookeeperFactory {
13
14    private static final ZkConnection ZK_CONNECTION = new ZkConnection(GlobalConfig.ZOOKEEPER_QUORUM);
15    private static final ZkClient ZK_CLIENT = new ZkClient(getZkConnection(), GlobalConfig.ZOOKEEPER_CONNECTION_TIMEOUT, new CustomSerializer());
16    private static final ZkUtils ZK_UTILS = new ZkUtils(getZkClient(), getZkConnection(), false);
17
18    public static ZkConnection getZkConnection() {
19        return ZK_CONNECTION;
20    }
21
22    public static ZkClient getZkClient() {
23        return ZK_CLIENT;
24    }
25
26    public static ZkUtils getZkUtils() {
27        return ZK_UTILS;
28    }
29
30    private ZookeeperFactory() {
31    }
32}

HBaseFactory.java

  1import org.apache.hadoop.conf.Configuration;
  2import org.apache.hadoop.hbase.HBaseConfiguration;
  3import org.apache.hadoop.hbase.TableName;
  4import org.apache.hadoop.hbase.client.*;
  5import org.apache.hadoop.hbase.io.compress.Compression;
  6import org.apache.hadoop.hbase.util.Bytes;
  7
  8import java.io.IOException;
  9import java.util.Arrays;
 10import java.util.List;
 11import java.util.stream.Stream;
 12
 13/**
 14 * @author : ldk
 15 * @date : 2019/1/22.
 16 */
 17public class HBaseFactory {
 18
 19    private static Connection conn = null;
 20    private static Configuration conf = null;
 21
 22    public static Configuration getHBaseConf() {
 23        if (conf == null) {
 24            conf = HBaseConfiguration.create();
 25            conf.set("hbase.zookeeper.quorum", GlobalConfig.ZOOKEEPER_QUORUM);
 26            conf.set("zookeeper.znode.parent", GlobalConfig.HBASE_ZNODE_PARENT);
 27        }
 28        return conf;
 29    }
 30
 31    public static Connection createHBaseConn() {
 32        if (conn == null || conn.isClosed()) {
 33            try {
 34                conn = ConnectionFactory.createConnection(getHBaseConf());
 35            } catch (IOException e) {
 36                SystemUtils.error("创建HBase连接异常 : ", e);
 37            }
 38        }
 39        return conn;
 40    }
 41
 42    public static synchronized void createTables(String tableName,Integer version) {
 43        try (Admin admin = HBaseFactory.createHBaseConn().getAdmin()) {
 44            if (admin.tableExists(TableName.valueOf(tableName))) {
 45                return;
 46            }
 47            TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
 48            builder.setColumnFamily(
 49                    ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(COLUMN_FAMILY))
 50                            /**
 51                             * 启用压缩
 52                             */
 53                            .setCompressionType(Compression.Algorithm.SNAPPY)
 54                            /**
 55                             * 设置最大存储版本号
 56                             */
 57                            .setMaxVersions(version)
 58                            .build()
 59            );
 60            if (!admin.tableExists(TableName.valueOf(tableName))) {
 61                admin.createTable(builder.build());
 62            }
 63        } catch (Exception e) {
 64            SystemUtils.error("创建HBase表结构异常: " + tableName, e);
 65        }
 66    }
 67
 68    public static boolean isTableExist(String tableName) {
 69        try (Admin admin = HBaseFactory.createHBaseConn().getAdmin()) {
 70            return admin.tableExists(TableName.valueOf(tableName));
 71        } catch (Exception e) {
 72            SystemUtils.error("判断HBase表状态异常: " + tableName, e);
 73        }
 74        return false;
 75    }
 76
 77    public static void writeToHBase(String tableName, Put put) {
 78        Table table = null;
 79        try {
 80            table = HBaseFactory.createHBaseConn().getTable(TableName.valueOf(tableName));
 81            table.put(put);
 82        } catch (Exception e) {
 83            SystemUtils.error("HBase数据写入异常 TABLE NAME :" + "[ " + tableName + " ]", e);
 84        }
 85        finally {
 86            try {
 87                if(table!=null){
 88                    table.close();
 89                }
 90            } catch (IOException e) {
 91                e.printStackTrace();
 92            }
 93        }
 94    }
 95
 96    public static void writeToHBase(String tableName, List<Put> puts) {
 97        Table table = null;
 98        try {
 99            table = HBaseFactory.createHBaseConn().getTable(TableName.valueOf(tableName));
100            table.put(puts);
101        } catch (Exception e) {
102            SystemUtils.error("HBase数据写入异常 TABLE NAME :" + "[ " + tableName + " ]", e);
103        }
104        finally {
105            try {
106                if(table!=null){
107                    table.close();
108                }
109            } catch (IOException e) {
110                e.printStackTrace();
111            }
112        }
113    }
114}

运行截图

20190911182414.png

PS:可以忽略最后一个Job的运行时间,这个是异常Streaming的截图,正常的我没截到。
每十个批次合并一次内存持久RDD。一般任务是两个job,第十个批次是三个job,有一个job是用于合并内存持久化RDD的。

感兴趣的人可以去了解下为什么任务中会有skipped的任务。

总结

这个任务对集群有一定的要求,是把统计的数据放在了内存中,计算快,但是需要的内存量大。如果把统计结果放在内存中,会相对较小一些,因为业务的特殊要求,将mapWithState方法当成去重函来使用了。
下面这个例子是将统计结果放到内存中了。Spark Streaming 实时统计商户当日累计PV流量


标题:Spark Streaming 实时统计数据(累加器的应用)
作者:ludengke95
地址:http://xvhi.ludengke95.xyz/articles/2019/09/11/1568198286021.html