定制Flink输出的parquet文件名

问题描述

使用Flink将kafka等数据源的数据流,经过处理后输出到文件,我们一般是这样写代码的:

Schema avroSchema = ...;
StreamingFileSink<GenericRecord> sink = StreamingFileSink
        .forBulkFormat(new Path("my/base/path"), ParquetAvroWriters.forGenericRecord(avroSchema))
        .withBucketAssigner(new MyBucketAssigner())
        .build();

实际使用时发现,生成的parquet文件名称是"part-1-2"这种格式的,且没有扩展名。由于我们的应用对parquet文件名有一定的规范且文件名里包含一些实用信息(例如文件里的记录条数),所以这样是不能满足我们要求的。

然而flink里这个文件名的规则是写死在Bucket.java里的无法修改,只能寻找变通的方法来解决。

解决方法

StreamingFileSink.forBulkFormat()的第二个参数是一个Factory,用于创建BulkWriter,我们可以从这里入手,注入自定义的BulkWriter,在写入文件的时候修改parquet文件名。

以下是若干个相关类,经实测通过。这个方案最大的问题是需要通过反射获取targetFile文件名,所以有可能在未来的flink版本里失效。

StreamingFileSink:

Schema avroSchema = ...;
StreamingFileSink<GenericRecord> sink = StreamingFileSink
        .forBulkFormat(new Path("my/base/path"), new MyParquetWriterFactory(new MyParquetBuilder()))
        .withBucketAssigner(new MyBucketAssigner())
        .build();

MyParquetWriterFactory:

static class MyParquetWriterFactory extends ParquetWriterFactory {
    public MyParquetWriterFactory(ParquetBuilder writerBuilder) {
        super(writerBuilder);
    }

    @Override
    public BulkWriter create(FSDataOutputStream stream) throws IOException {
        BulkWriter writer = super.create(stream);
        return new MyParquetBulkWriter(writer, stream);
    }
}

MyParquetBulkWriter:

/**
 * 包装flink的ParquetBulkWriter,修改part文件名格式
 */
static class MyParquetBulkWriter implements BulkWriter<GenericRecord> {
    private FSDataOutputStream stream;
    private BulkWriter writer;
    private int rowCount;

    public MyParquetBulkWriter(BulkWriter writer, FSDataOutputStream stream) {
        this.writer = writer;
        this.stream = stream;
    }

    @Override
    public void addElement(GenericRecord element) throws IOException {
        writer.addElement(element);
        rowCount++; //记录计数,结果将作为文件名的一部分
    }

    @Override
    public void flush() throws IOException {
        writer.flush();
    }

    @Override
    public void finish() throws IOException {
        // 试图在finish()后改名失败,因为finish()后正式文件并没有生成
        // 通过反射直接修改stream里的targetFile名称可行
        // 这里是修改part文件名的关键部分
        try {
            Field field = stream.getClass().getDeclaredField("targetFile");
            field.setAccessible(true);
            File targetFile = (File) field.get(stream);
            File renamedTargetFile = new File(targetFile.getParent(), rowCount + "_" + System.currentTimeMillis() + ".parquet");
            field.set(stream, renamedTargetFile);
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } finally {
            writer.finish();
        }
    }
}

MyParquetBuilder(其中avroSchema是在外部赋值的):

static class MyParquetBuilder implements ParquetBuilder {
    @Override
    public ParquetWriter createWriter(OutputFile outputFile) throws IOException {
        return 
        ParquetWriter.builder(outputFile).withSchema(avroSchema).withDataModel(GenericData.get()).build();
    }
}

参考资料

Flink streaming - Change part file names when using StreamingFileSink?

使用Apache Flink处理Kafka数据流

安装和启动kafka

kafka官网下载并解压到本地即可。

建议:如果从远程访问这个kafka,需要修改config/server.properties里的listeners属性为实际ip地址,否则producer发送数据时会提示“Connection to node 0 (localhost/127.0.0.1:9092) could not be established”:

listeners=PLAINTEXT://<ipaddress>:9092

启动kafka(如果是windows环境,将bin改为bin\windows,.sh改为.bat):

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

安装flink

flink官网下载压缩包,解压到本地即可。

启动flink:

bin/start-cluster

启动后访问 localhost:8081 可打开Flink Web Dashboard:

file

创建flink项目

用maven自动创建项目框架,这一步根据网络情况可能比较慢,耐心等待10分钟左右:

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.9.0 -DgroupId=com.test -DartifactId=flink -Dversion=1.0.0 -Dpackage=com.test -DinteractiveMode=false

在生成的pom.xml里添加flink-kafka-connector依赖(注意scala版本要与下载的kafka的scala版本一致):

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

我们要处理流式数据,因此在生成的StreamingJob.java基础上修改。

public static void main(String[] args) throws Exception {
    // set up the streaming execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties props = new Properties();
    props.setProperty("bootstrap.servers", "10.1.10.76:9092");
    FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("flink_test", new SimpleStringSchema(), props);
    DataStream<String> stream = env.addSource(consumer);

    StreamingFileSink<String> sink = StreamingFileSink
            .forRowFormat(new Path("c:/temp/kafka-loader"), new SimpleStringEncoder<String>())
            .withBucketAssigner(new MyBucketAssigner())
            .build();
    stream.addSink(sink);

    // execute program
    env.execute("Flink Streaming Java API Skeleton");
}

static class MyBucketAssigner implements BucketAssigner<String, String> {
    @Override
    public String getBucketId(String element, Context context) {
        return "" + element.charAt(0);
    }

    @Override
    public SimpleVersionedSerializer<String> getSerializer() {
        return SimpleVersionedStringSerializer.INSTANCE;
    }
}

运行flink作业

方式1:在IDE里直接运行main()方法即可,此处不细述。

方式2:mvn clean package打成jar包,然后打开localhost:8081控制台页面,选择左侧的Submit New Job菜单上传生成的jar包(按前述maven生成的pom.xml配置会自动生成两个jar包,要上传其中比较大的那个fatjar包)。上传成功后点击jar包,再点击Submit按钮即可。

测试发送数据

bin\windows\kafka-console-producer --broker-list <ipaddress>:9092 --topic flink_test

随机输入一些字符串并按回车键,在/tmp/kafka-loader目录下,应该会按字符串首字母生成相应的目录,里面的文件内容是所输入的字符串。

一些坑

FlinkKafkaConsumer

FlinkKafkaConsumer()的构造方法里,第二个参数可以是DeserializationSchema类型,也可以是KafkaDeserializationSchema类型,之前为了将flink-connector-kafka里自带的JSONKeyValueDeserializationSchema等(详见链接)转为前者找了半天,其实不用转直接用就可以。

JSONKeyValueDeserializationSchema

JSONKeyValueDeserializationSchema适合kafka消息内容为json格式的,如果不是json格式,比如是逗号分隔的格式,还是自己实现KafkaDeserializationSchema,并不复杂。比如之前我用SimpleStringSchema无法获取到消息里的key信息,就需要用flink-connector-kafka提供的Deser:

static class MyKeyedDeserializationSchema implements KafkaDeserializationSchema<String> {
    @Override
    public boolean isEndOfStream(String s) {
        return false;
    }

    @Override
    public String deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
        return consumerRecord.key() + "," + consumerRecord.value();  // key()是repo名称,将其插入消息体以便后续处理
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

Part文件名

单写了一篇:定制Flink输出的parquet文件名

参考资料:

使用 Apache Flink 开发实时 ETL
Flink Kafka Connector

CentOS7.0安装mysql5.7

CentOS7.0里的yum默认不带mysql最新版本(其默认数据库是mariaDB,可替代mysql),所以如果要安装mysql需要参考A Quick Guide to Using the MySQL Yum Repository这个文档。

首先安装rpm源:

wget https://repo.mysql.com//mysql80-community-release-el7-3.noarch
sudo rpm -Uvh mysql80-community-release-el7-3.noarch.rpm

然后要选择安装哪个版本,默认是最新版本(8.0),我们希望安装5.7:

sudo yum-config-manager --disable mysql80-community
sudo yum-config-manager --enable mysql57-community

检验版本配置成功:

yum repolist enabled | grep mysql

然后执行安装命令:

sudo yum install mysql-community-server

启动mysqld服务:

sudo service mysqld start

mysql安装后会生成一个随机的root用户密码,用下面的命令获取到:

sudo grep 'temporary password' /var/log/mysqld.log

修改root用户密码:

ALTER USER 'root'@'localhost' IDENTIFIED BY 'MyNewPass4!';

使用Apache Drill处理数据文件

本文针对drill版本1.8。

安装Drill

官网下载tar包并解压即可,linux和windows都是如此。
注意:drill要求java版本是8或以上。

命令行使用Drill

最简单的方式是用embedded模式启动drill(启动后可以在浏览器里访问 http://localhost:8047 来管理drill):

bin/drill-embedded

这样就以嵌入模式启动了drill服务并同时进入了内置的sqline命令行。如果drill服务已经启动,则可以这样进入sqline命令行(参考):

bin/sqline -u jdbc:drill:drillbit=localhost

作为例子,用SQL语法查询一下drill自带的数据(命令行里的cp表示classpath,注意查询语句最后要加分号结尾):

apache drill> SELECT * FROM cp.`employee.json` LIMIT 3;

查询任意数据文件的内容:

apache drill> SELECT * FROM dfs.`/home/hao/apache-drill-1.16.0/sample-data/region.parquet`;

退出命令行用!quit

配置和查看Drill参数

如果要永久性修改参数值,需要修改$DRILL_HOME/conf/drill-override.conf文件(见文档);SET、RESET命令可以在当前session里修改参数值(文档)。

配置参数:

SET `store.parquet.block-size` = 536870912

重置参数为缺省值:

RESET `store.parquet.block-size`

查看参数:

select * from sys.options where name = 'store.parquet.block-size'

在java代码里使用Drill

下面是在java代码里使用Drill的例子代码,要注意的一点是,JDBC的URL是jdbc:drill:drillbit=localhost,而不是很多教程上说的jdbc:drill:zk=localhost

package com.acme;

import java.sql.*;

public class DrillJDBCExample {
    static final String JDBC_DRIVER = "org.apache.drill.jdbc.Driver";
    //static final String DB_URL = "jdbc:drill:zk=localhost:2181";
    static final String DB_URL = "jdbc:drill:drillbit=localhost"; //for embedded mode installation

    static final String USER = "admin";
    static final String PASS = "admin";

    public static void main(String[] args) {
        Connection conn = null;
        Statement stmt = null;
        try{
            Class.forName(JDBC_DRIVER);
            conn = DriverManager.getConnection(DB_URL,USER,PASS);

            stmt = conn.createStatement();
            /* Perform a select on data in the classpath storage plugin. */
            String sql = "select employee_id,first_name,last_name from cp.`employee.json`";
            ResultSet rs = stmt.executeQuery(sql);

            while(rs.next()) {
                int id  = rs.getInt("employee_id");
                String first = rs.getString("first_name");
                String last = rs.getString("last_name");

                System.out.print("ID: " + id);
                System.out.print(", First: " + first);
                System.out.println(", Last: " + last);
            }

            rs.close();
            stmt.close();
            conn.close();
        } catch(SQLException se) {
            //Handle errors for JDBC
            se.printStackTrace();
        } catch(Exception e) {
            //Handle errors for Class.forName
            e.printStackTrace();
        } finally {
            try{
                if(stmt!=null)
                    stmt.close();
            } catch(SQLException se2) {
            }
            try {
                if(conn!=null)
                    conn.close();
            } catch(SQLException se) {
                se.printStackTrace();
            }
        }
    }
}

让Drill访问数据库

根据要访问的数据库的不同,需要为Drill添加相应的驱动,方法见RDBMS Storage Plugin

利用Drill将csv格式转换为parquet格式

原理是在drill里创建一张格式为parquet的表,该表的路径(下例中的/parquet1)对应的是磁盘上的一个目录。

ALTER SESSION SET `store.format`='parquet';
ALTER SESSION SET `store.parquet.compression` = 'snappy';

CREATE TABLE dfs.tmp.`/parquet1` AS 
SELECT * FROM dfs.`/my/csv/file.csv`;

让drill支持.zip、.arc压缩格式

(暂缺)

参考资料

Drill in 10 Minutes
How to convert a csv file to parquet

Fragment里getActivity()为空问题

问题

一个常见的场景:fragment在onViewCreated()里执行一个任务(AsyncTask)加载一些数据,任务没有完成时,用户切换到其他界面,这时如果在代码里调用了getActivity()方法,得到的将是空值。

这是一个在本地不容易发现的问题,但通过在线异常报告(例如bugly或友盟)容易发现。

解决方法

同时采取以下两项措施:

  1. 在任务的doInBackground()的一开始就获取context,而不是在每处都是用getActivity()获取;
  2. onStop()onDetach()方法里取消正在执行的任务以避免onPostExecute()被继续执行。

参考资料

getActivity() returns null in Fragment function

应用计时手机权限配置指南

应用计时需要常驻手机后台才能准确计时,目前各个品牌的手机默认都会对后台应用做限制,以达到节电的目的,因此需要手工开启一些权限。

各品牌手机开启权限的方法如下:

1. 小米手机(MIUI)

开机自启动(必须)

方法1:安全中心 -> 应用管理 -> 权限 -> 自启动 -> 自启动管理
方法2:设置 -> 授权管理 -> 自启动管理

悬浮窗(建议)

悬浮窗提醒时长,以及强制模式禁用当前应用,这两项功能需要悬浮窗权限。
方法1:安全中心 -> 应用管理 -> 权限 -> 应用权限管理 -> 应用计时 -> 显示悬浮窗
方法2:设置 -> 授权管理 -> 应用权限管理 -> 应用计时 -> 显示悬浮窗

无障碍(建议)

当计时模式选择的是辅助模式时,这一项是必须配置的。
方法:设置 -> 更多设置 -> 无障碍

锁定后台(建议)

锁定后台的目的是避免一键内存清理。
MIUI10:打开最近任务列表,按住应用计时,在弹出菜单里选择加锁。
MIUI9:打开最近任务列表,下拉应用计时加锁。

后台弹出界面(建议)

强制模式下,如果选择了超时直接退出应用,需要此项权限。
方法:安全中心 -> 应用管理 -> 权限 -> 应用权限管理 -> 应用计时 -> 后台弹出界面

2. 华为手机(EMUI 9)

开机自启动(必须)

方法:设置 > 应用 > 自启动 > 应用计时 > 允许自启动

悬浮窗(建议)

悬浮窗提醒时长,以及强制模式禁用当前应用,这两项功能需要悬浮窗权限。
方法:设置 > 应用 > 应用计时 > 显示在其他应用之上

无障碍(建议)

方法:设置 > 智能辅助 > 无障碍 > 应用计时

锁定后台(建议)

锁定后台的目的是避免一键内存清理。
方法:打开最近任务列表,下拉应用计时加锁。

3. 魅族手机 (Flyme 7.2)

开机自启动(必须)

待补充

后台运行(建议)

方法: 手机管家 > 权限管理 > 应用管理 > 应用计时 > 后台管理 > 允许后台运行

悬浮窗(建议)

悬浮窗提醒时长,以及强制模式禁用当前应用,这两项功能需要悬浮窗权限。
方法:手机管家 > 权限管理 > 应用管理 > 应用计时 > 悬浮窗

无障碍(建议)

方法:设置 > 辅助功能 > 无障碍 > 应用计时

锁定后台(建议)

锁定后台的目的是避免一键内存清理。
方法:多任务列表,下拉应用计时加锁。

4. VIVO手机 (Funtouch OS)

开机自启动(必须)

方法:设置 > 更多设置权限管理应用计时单项权限设置自启动

悬浮窗(建议)

悬浮窗提醒时长,以及强制模式禁用当前应用,这两项功能需要悬浮窗权限。
方法:设置 > 更多设置权限管理应用计时单项权限设置悬浮窗

无障碍(建议)

方法:设置 > 更多设置 > 辅助功能 > 应用计时

锁定后台(建议)

锁定后台的目的是避免一键内存清理。
方法:多任务列表,下拉应用计时图标加锁。

后台弹出界面(建议)

方法:设置 > 更多设置权限管理应用计时单项权限设置后台弹出界面

由于市面手机机型和版本数量众多,以上内容难免有不完整或错误之处,请加QQ群763350557反馈您的宝贵意见。

感谢以下网友的贡献:

  • 吃瓜群众
  • 冬日可爱

Intellij IDEA输出jar包注意事项

Intellij IDEA Ultimate 2016.1

菜单File -> Project Structures -> Artifacts

注意:MANIFEST.MF文件不要使用默认的路径,应放在项目根目录下,否则打包后此文件不在jar包内或内容不正确。

 

输出"uber-jar"运行时可能会遇到错误“Exception in thread "main" java.lang.SecurityException: Invalid signature file digest for Manifest main attributes”,此时可尝试删除jar包里META-INF目录下所有的.SF、.DSA和.RSA文件。

参考:https://stackoverflow.com/questions/999489/invalid-signature-file-when-attempting-to-run-a-jar

Win7访问家庭组共享文件夹提示没有权限

问题:Win7电脑访问Win10电脑的共享文件夹时,提示下面的错误:

解决方法:

1、在Win10高级共享设置里,勾选“关闭密码保护共享”。这样客户端访问时就不会提示输入用户名密码了(我遇到的情况是即使输入了正确的用户名密码也提示不正确,这个问题并没解决)。

2、在Win10共享的文件夹属性的“安全”页里,加入"Everyone"这个用户并设置足够的权限即可。并不像网上说的那样,需要修改组策略“网络安全:LAN 管理器身份验证级别”。

查看Android应用占用内存情况

要查看指定app在手机上占用多少运行内存,首先将手机连接到电脑,然后在命令行执行下面的命令(其中com.my.package.name是app的包名):

adb shell dumpsys meminfo com.my.package.name

执行结果通常如下,其中Pss那一列的值(单位:kB)是我们主要需要关注的:

adb-dumpsys

参考链接:

adb shell dumpsys meminfo - What is the meaning of each cell of its output?

AndroidStudio无法在MIUI安装APK问题

现象

AndroidStudio 2.3,在小米4c搭载miui8真机上运行程序,提示下面的错误信息:

Installation error: INSTALL_CANCELED_BY_USER

 

解决方案

在MIUI开发者选项里,关闭“启用MIUI优化”选项。

关闭此选项时被要求重启,重启后暂时没有发现日常使用有什么变化。

Update: 关闭此选项后发现手机发热和耗电明显,应该是对后台应用的拦截失效导致的。

turn_off_miui_opt

参考链接:

Android Studio: Application Installation Failed