zabbix agent 安装脚本

#!/bin/bash

#define color variables
RED=’\E[1;31m’
GRN=’\E[1;32m’
YEL=’\E[1;33m’
MAG=’\E[1;35m’
RES=’\E[0m’
#define environment variables
ZB_NAME=”zabbix”
ZB_VERSION=”3.2.4″
ZB_SOFT=”${ZB_NAME}-${ZB_VERSION}.tar.gz”
ZB_SOURCE_DIR=”/data/usr/src”
ZB_INSTALL_DIR=”/data/usr/local”
ZB_SOFT_DIR=”/data/soft”
ZB_CONF_FILE=”${ZB_INSTALL_DIR}/${ZB_NAME}/etc/zabbix_agentd.conf”
ZB_SER_IP=’10.26.83.53′
ZB_PUBLIC_DIR=”/mnt/zabbix”
ZB_LOG_DIR=”${ZB_PUBLIC_DIR}/zabbix_agentd.log”
ZB_PID_DIR=”${ZB_PUBLIC_DIR}/zabbix_agentd.pid”
ZB_PORT=’10050′
ZB_HOSTNAME=`hostname`
ZB_USER=`cat /etc/passwd |grep zabbix| wc -l`

check_dependency () {

NUM=`lsof -i :${ZB_PORT} | grep “LISTEN” | grep -v grep | wc -l`
if [ ${NUM} -ne 0 ];then
echo -e “${GRN}zabbix agent is install,${RES}”
exit 1
fi

if [ ${ZB_USER} -eq 0 ];then
useradd -s /sbin/nologin zabbix
fi
[ ${UID} -ne 0 ] && echo -e “${RED}need to be root to that${RES}” && exit 1
[ ! -d ${ZB_SOFT_DIR} ] && echo -e “${RED}no ${ZB_SOFT_DIR},Create a directory…${RES}” && mkdir -pv ${ZB_SOFT_DIR}
[ ! -f ${ZB_SOFT_DIR}/${ZB_SOFT} ] && echo -e “${RED}not fount file ${ZB_SOFT}… , please check…${RES}” && exit 1
[ ! -d ${ZB_SOURCE_DIR} ] && echo -e “${RED}no ${ZB_SOURCE_DIR},Create a directory…${RES}” && mkdir -pv ${ZB_SOURCE_DIR}
[ ! -d ${ZB_INSTALL_DIR} ] && echo -e “${RED}no ${ZB_INSTALL_DIR},Create a directory…${RES}” && mkdir -pv ${ZB_INSTALL_DIR}
[ ! -d ${ZB_PUBLIC_DIR} ] && echo -e “${RED}no ${ZB_PUBLIC_DIR},Create a directory…${RES}” && mkdir -pv ${ZB_PUBLIC_DIR} && chown -R ${ZB_NAME}.${ZB_NAME} ${ZB_PUBLIC_DIR}
PUBLIC_DIR_OMNER=`stat -c %G ${ZB_PUBLIC_DIR}`
PUBLIC_DIR_GROUP=`stat -c %G ${ZB_PUBLIC_DIR}`
if [ “${PUBLIC_DIR_OMNER}” != “${ZB_NAME}” ] || [ “${PUBLIC_DIR_GROUP}” != “${ZB_NAME}” ];then
chown -R ${ZB_NAME}.${ZB_NAME} ${ZB_PUBLIC_DIR}
fi
}

start_install () {
[ -d ${ZB_SOURCE_DIR}/${ZB_NAME}-${ZB_VERSION} ] && rm -rf ${ZB_SOURCE_DIR}/${ZB_NAME}-${ZB_VERSION}
[ -d ${ZB_INSTALL_DIR}/${ZB_NAME} ] && rm -rf ${ZB_INSTALL_DIR}/${ZB_NAME}
tar xf ${ZB_SOFT_DIR}/${ZB_SOFT} -C ${ZB_SOURCE_DIR}
cd ${ZB_SOURCE_DIR}/${ZB_NAME}-${ZB_VERSION}
if [ $? -eq 0 ];then
yum -y install gcc curl lsof
if [ $? -eq 0 ];then
./configure –prefix=${ZB_INSTALL_DIR}/${ZB_NAME} –enable-agent
if [ $? -eq 0 ];then
make
if [ $? -eq 0 ];then
make install
if [ $? -eq 0 ];then
echo -e “${GRN}zabbix agent install success${RES}”
else
echo -e “${RED}zabbix agent install faild,MAKE INSTALL faild,please check…${RES}”
exit 1
fi
else
echo -e “${RED}zabbix agent install faild,MAKE faild,please check…${RES}”
exit 1
fi
else
echo -e “${RED}zabbix agent install faild,CONFIGURE faild,please check…${RES}”
exit 1
fi
else
echo -e “${RED}yum set faild…,please check yum…${RES}”
exit 1
fi
else
echo -e “${RED}directory change faild or zabbix source directory not fount,please check…${RES}”
exit 1
fi
}

start_configure () {
# backup zabbix agent configure file
cp ${ZB_CONF_FILE} ${ZB_CONF_FILE}\-bak
sed -i “s#\# PidFile=/tmp/zabbix_agentd.pid#PidFile=${ZB_PID_DIR}#g” ${ZB_CONF_FILE}
sed -i “s#LogFile=/tmp/zabbix_agentd.log#LogFile=${ZB_LOG_DIR}#g” ${ZB_CONF_FILE}
sed -i “s#Server=127.0.0.1#Server=${ZB_SER_IP}#g” ${ZB_CONF_FILE}
sed -i “s#ServerActive=127.0.0.1#ServerActive=${ZB_SER_IP}#g” ${ZB_CONF_FILE}
sed -i “s#Hostname=Zabbix server#Hostname=${ZB_HOSTNAME}#g” ${ZB_CONF_FILE}
}

start_zabbix_agent () {
${ZB_INSTALL_DIR}/${ZB_NAME}/sbin/zabbix_agentd
NUM=`lsof -i :${ZB_PORT} | grep “LISTEN” | grep -v grep | wc -l`
if [ ${NUM} -ne 0 ];then
echo -e “${GRN}zabbix agent start success${RES}”
else
echo -e “${RED}zabbix agent start faild${RES}”
fi
}

check_dependency
start_install
start_configure
start_zabbix_agent

重启service脚本

PID=`ps aux | grep $1 | grep -v grep | grep -v $0 | awk ‘{print $2}’`
export JAVA_HOME=/usr/lib/jvm/jdk7
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
if [ -z $PID ];then
sh $1/student-service/bin/start.sh
if [ $? -eq 0 ];then
echo “service start success”
else

echo “service start failed”
fi
else
kill -9 $PID
sh $1/student-service/bin/start.sh
if [ $? -eq 0 ];then
echo “service start success”
else

echo “service start failed”
fi
fi

重启Tomcat脚本

PID=`ps aux | grep $1 | grep -v grep | grep -v $0 | awk ‘{print $2}’`
export JAVA_HOME=/usr/lib/jvm/jdk7
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
if [ -z $PID ];then
sh $1/bin/startup.sh
if [ $? -eq 0 ];then
echo “tomcat start success”
else

echo “tomcat start failed”
fi
else
kill -9 $PID
sh $1/bin/startup.sh
if [ $? -eq 0 ];then
echo “tomcat start success”
else

echo “tomcat start failed”
fi
fi

消息队列设计精要

摘要: 本文摘自美团点评技术团队Blog

声明:本文摘自美团点评技术团队Blog,本文只摘取了全文中自己比较关注的点也对部分的点加了修改,便于以后查看,如果希望看原文,请查看转发链接

消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。
当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发的Notify、MetaQ、RocketMQ等。
本文不会一一介绍这些消息队列的所有特性,而是探讨一下自主开发设计一个消息队列时,你需要思考和设计的重要方面。过程中我们会参考这些成熟消息队列的很多重要思想。
本文首先会阐述什么时候你需要一个消息队列,然后以Push模型为主,从零开始分析设计一个消息队列时需要考虑到的问题,如RPC、高可用、顺序和重复消息、可靠投递、消费关系解析等。
也会分析以Kafka为代表的pull模型所具备的优点。最后是一些高级主题,如用批量/异步提高性能、pull模型的系统设计理念、存储子系统的设计、流量控制的设计、公平调度的实现等。其中最后四个方面会放在下篇讲解。

何时需要消息队列

当你需要使用消息队列时,首先需要考虑它的必要性。可以使用mq的场景有很多,最常用的几种,是做业务解耦/最终一致性/广播/错峰流控等。反之,如果需要强一致性,关注业务逻辑的处理结果,则RPC显得更为合适。

解耦

比方说某个点评模块,用户点评到点评成功,之间可能会对点评的内容进行广告审核,涉黄涉政,或者其他校验,假如使用串行耦合结构的话,用户点评到点评成功的时间,包括了所有模块的校验,随着校验越来越多,点评成功的时间也就越来越长,自然而然会想到将各个模块并行化,但是并行化还是没有解耦,点评成功的时间将是各个模块消耗时间最长的那个,用户其实只关心点评成功,这里就可以使用消息中间件来进行解耦,将各个模块的校验由消息中间件来确保送达,用户就不需要等待这些模块了,由消息中间件来确保最终一致性就可以了。

最终一致性

最终一致性指的是两个系统的状态保持一致,要么都成功,要么都失败。当然有个时间限制,理论上越快越好,但实际上在各种异常的情况下,可能会有一定延迟达到最终一致状态,但最后两个系统的状态是一样的。
业界有一些为“最终一致性”而生的消息队列,如Notify(阿里)、QMQ(去哪儿)等,其设计初衷,就是为了交易系统中的高可靠通知。
以一个银行的转账过程来理解最终一致性,转账的需求很简单,如果A系统扣钱成功,则B系统加钱一定成功。反之则一起回滚,像什么都没发生一样。
然而,这个过程中存在很多可能的意外:

  1. A扣钱成功,调用B加钱接口失败。
  2. A扣钱成功,调用B加钱接口虽然成功,但获取最终结果时网络异常引起超时。
  3. A扣钱成功,B加钱失败,A想回滚扣的钱,但A机器down机。

可见,想把这件看似简单的事真正做成,真的不那么容易。所有跨VM的一致性问题,从技术的角度讲通用的解决方案是:

  1. 强一致性,分布式事务,但落地太难且成本太高,后文会具体提到。
  2. 最终一致性,主要是用“记录”和“补偿”的方式。在做所有的不确定的事情之前,先把事情记录下来,然后去做不确定的事情,结果可能是:成功、失败或是不确定,“不确定”(例如超时等)可以等价为失败。成功就可以把记录的东西清理掉了,对于失败和不确定,可以依靠定时任务等方式把所有失败的事情重新搞一遍,直到成功为止。
    回到刚才的例子,系统在A扣钱成功的情况下,把要给B“通知”这件事记录在库里(为了保证最高的可靠性可以把通知B系统加钱和扣钱成功这两件事维护在一个本地事务里),通知成功则删除这条记录,通知失败或不确定则依靠定时任务补偿性地通知我们,直到我们把状态更新成正确的为止。
    整个这个模型依然可以基于RPC来做,但可以抽象成一个统一的模型,基于消息队列来做一个“企业总线”。
    具体来说,本地事务维护业务变化和通知消息,一起落地(失败则一起回滚),然后RPC到达broker,在broker成功落地后,RPC返回成功,本地消息可以删除。否则本地消息一直靠定时任务轮询不断重发,这样就保证了消息可靠落地broker。
    broker往consumer发送消息的过程类似,一直发送消息,直到consumer发送消费成功确认。
    我们先不理会重复消息的问题,通过两次消息落地加补偿,下游是一定可以收到消息的。然后依赖状态机版本号等方式做判重,更新自己的业务,就实现了最终一致性。

最终一致性不是消息队列的必备特性,但确实可以依靠消息队列来做最终一致性的事情。另外,所有不保证100%不丢消息的消息队列,理论上无法实现最终一致性。好吧,应该说理论上的100%,排除系统严重故障和bug。
像Kafka一类的设计,在设计层面上就有丢消息的可能(比如定时刷盘,如果掉电就会丢消息)。哪怕只丢千分之一的消息,业务也必须用其他的手段来保证结果正确。

广播

消息队列的基本功能之一是进行广播。如果没有消息队列,每当一个新的业务方接入,我们都要联调一次新接口。有了消息队列,我们只需要关心消息是否送达了队列,至于谁希望订阅,是下游的事情,无疑极大地减少了开发和联调的工作量。
比如本文开始提到的产品中心发布产品变更的消息,以及景点库很多去重更新的消息,可能“关心”方有很多个,但产品中心和景点库只需要发布变更消息即可,谁关心谁接入。

错峰与流控

试想上下游对于事情的处理能力是不同的。比如,Web前端每秒承受上千万的请求,并不是什么神奇的事情,只需要加多一点机器,再搭建一些LVS负载均衡设备和Nginx等即可。但数据库的处理能力却十分有限,即使使用SSD加分库分表,单机的处理能力仍然在万级。由于成本的考虑,我们不能奢求数据库的机器数量追上前端。
这种问题同样存在于系统和系统之间,如短信系统可能由于短板效应,速度卡在网关上(每秒几百次请求),跟前端的并发量不是一个数量级。但用户晚上个半分钟左右收到短信,一般是不会有太大问题的。如果没有消息队列,两个系统之间通过协商、滑动窗口等复杂的方案也不是说不能实现。但系统复杂性指数级增长,势必在上游或者下游做存储,并且要处理定时、拥塞等一系列问题。而且每当有处理能力有差距的时候,都需要单独开发一套逻辑来维护这套逻辑。所以,利用中间系统转储两个系统的通信内容,并在下游系统有能力处理这些消息的时候,再处理这些消息,是一套相对较通用的方式。

总而言之,消息队列不是万能的。对于需要强事务保证而且延迟敏感的,RPC是优于消息队列的。
对于一些无关痛痒,或者对于别人非常重要但是对于自己不是那么关心的事情,可以利用消息队列去做。
支持最终一致性的消息队列,能够用来处理延迟不那么敏感的“分布式事务”场景,而且相对于笨重的分布式事务,可能是更优的处理方式。
当上下游系统处理能力存在差距的时候,利用消息队列做一个通用的“漏斗”。在下游有能力处理的时候,再进行分发。
如果下游有很多系统关心你的系统发出的通知的时候,果断地使用消息队列吧。

如何设计一个消息队列

综述

我们现在明确了消息队列的使用场景,下一步就是如何设计实现一个消息队列了。

基于消息的系统模型,不一定需要broker(消息队列服务端)。市面上的的Akka(actor模型)、ZeroMQ等,其实都是基于消息的系统设计范式,但是没有broker。
我们之所以要设计一个消息队列,并且配备broker,无外乎要做两件事情:

  1. 消息的转储,在更合适的时间点投递,或者通过一系列手段辅助消息最终能送达消费机。
  2. 规范一种范式和通用的模式,以满足解耦、最终一致性、错峰等需求。
    掰开了揉碎了看,最简单的消息队列可以做成一个消息转发器,把一次RPC做成两次RPC。发送者把消息投递到服务端(以下简称broker),服务端再将消息转发一手到接收端,就是这么简单。

一般来讲,设计消息队列的整体思路是先build一个整体的数据流,例如producer发送给broker,broker发送给consumer,consumer回复消费确认,broker删除/备份消息等。
利用RPC将数据流串起来。然后考虑RPC的高可用性,尽量做到无状态,方便水平扩展。
之后考虑如何承载消息堆积,然后在合适的时机投递消息,而处理堆积的最佳方式,就是存储,存储的选型需要综合考虑性能/可靠性和开发维护成本等诸多因素。
为了实现广播功能,我们必须要维护消费关系,可以利用zk/config server等保存消费关系。
在完成了上述几个功能后,消息队列基本就实现了。然后我们可以考虑一些高级特性,如可靠投递,事务特性,性能优化等。
下面我们会以设计消息队列时重点考虑的模块为主线,穿插灌输一些消息队列的特性实现方法,来具体分析设计实现一个消息队列时的方方面面。

实现队列基本功能

RPC通信协议

刚才讲到,所谓消息队列,无外乎两次RPC加一次转储,当然需要消费端最终做消费确认的情况是三次RPC。既然是RPC,就必然牵扯出一系列话题,什么负载均衡啊、服务发现啊、通信协议啊、序列化协议啊,等等。在这一块,我的强烈建议是不要重复造轮子。利用公司现有的RPC框架:Thrift也好,Dubbo也好,或者是其他自定义的框架也好。因为消息队列的RPC,和普通的RPC没有本质区别。当然了,自主利用Memchached或者Redis协议重新写一套RPC框架并非不可(如MetaQ使用了自己封装的Gecko NIO框架,卡夫卡也用了类似的协议)。但实现成本和难度无疑倍增。排除对效率的极端要求,都可以使用现成的RPC框架。
简单来讲,服务端提供两个RPC服务,一个用来接收消息,一个用来确认消息收到。并且做到不管哪个server收到消息和确认消息,结果一致即可。当然这中间可能还涉及跨IDC的服务的问题。这里和RPC的原则是一致的,尽量优先选择本机房投递。你可能会问,如果producer和consumer本身就在两个机房了,怎么办?首先,broker必须保证感知的到所有consumer的存在。其次,producer尽量选择就近的机房就好了。

高可用

其实所有的高可用,是依赖于RPC和存储的高可用来做的。先来看RPC的高可用,美团的基于MTThrift的RPC框架,阿里的Dubbo等,其本身就具有服务自动发现,负载均衡等功能。而消息队列的高可用,只要保证broker接受消息和确认消息的接口是幂等的,并且consumer的几台机器处理消息是幂等的,这样就把消息队列的可用性,转交给RPC框架来处理了。
那么怎么保证幂等呢?最简单的方式莫过于共享存储。broker多机器共享一个DB或者一个分布式文件/kv系统,则处理消息自然是幂等的。就算有单点故障,其他节点可以立刻顶上。另外failover可以依赖定时任务的补偿,这是消息队列本身天然就可以支持的功能。存储系统本身的可用性我们不需要操太多心,放心大胆的交给DBA们吧!
对于不共享存储的队列,如Kafka使用分区加主备模式,就略微麻烦一些。需要保证每一个分区内的高可用性,也就是每一个分区至少要有一个主备且需要做数据的同步,关于这块HA的细节,可以参考下篇pull模型消息系统设计。

服务端承载消息堆积的能力

消息到达服务端如果不经过任何处理就到接收者了,broker就失去了它的意义。为了满足我们错峰/流控/最终可达等一系列需求,把消息存储下来,然后选择时机投递就显得是顺理成章的了。
只是这个存储可以做成很多方式。比如存储在内存里,存储在分布式KV里,存储在磁盘里,存储在数据库里等等。但归结起来,主要有持久化和非持久化两种。
持久化的形式能更大程度地保证消息的可靠性(如断电等不可抗外力),并且理论上能承载更大限度的消息堆积(外存的空间远大于内存)。
但并不是每种消息都需要持久化存储。很多消息对于投递性能的要求大于可靠性的要求,且数量极大(如日志)。这时候,消息不落地直接暂存内存,尝试几次failover,最终投递出去也未尝不可。
市面上的消息队列普遍两种形式都支持。当然具体的场景还要具体结合公司的业务来看。

存储子系统的选择

我们来看看如果需要数据落地的情况下各种存储子系统的选择。理论上,从速度来看,文件系统>分布式KV(持久化)>分布式文件系统>数据库,而可靠性却截然相反。还是要从支持的业务场景出发作出最合理的选择,如果你们的消息队列是用来支持支付/交易等对可靠性要求非常高,但对性能和量的要求没有这么高,而且没有时间精力专门做文件存储系统的研究,DB是最好的选择。
但是DB受制于IOPS,如果要求单broker 5位数以上的QPS性能,基于文件的存储是比较好的解决方案。整体上可以采用数据文件+索引文件的方式处理,具体这块的设计比较复杂,可以参考下篇的存储子系统设计。
分布式KV(如MongoDB,HBase)等,或者持久化的Redis,由于其编程接口较友好,性能也比较可观,如果在可靠性要求不是那么高的场景,也不失为一个不错的选择。

消费关系解析

现在我们的消息队列初步具备了转储消息的能力。下面一个重要的事情就是解析发送接收关系,进行正确的消息投递了。
市面上的消息队列定义了一堆让人晕头转向的名词,如JMS 规范中的Topic/Queue,Kafka里面的Topic/Partition/ConsumerGroup,RabbitMQ里面的Exchange等等。抛开现象看本质,无外乎是单播与广播的区别。所谓单播,就是点到点;而广播,是一点对多点。当然,对于互联网的大部分应用来说,组间广播、组内单播是最常见的情形。
消息需要通知到多个业务集群,而一个业务集群内有很多台机器,只要一台机器消费这个消息就可以了。
当然这不是绝对的,很多时候组内的广播也是有适用场景的,如本地缓存的更新等等。另外,消费关系除了组内组间,可能会有多级树状关系。这种情况太过于复杂,一般不列入考虑范围。所以,一般比较通用的设计是支持组间广播,不同的组注册不同的订阅。组内的不同机器,如果注册一个相同的ID,则单播;如果注册不同的ID(如IP地址+端口),则广播。
至于广播关系的维护,一般由于消息队列本身都是集群,所以都维护在公共存储上,如config server、zookeeper等。维护广播关系所要做的事情基本是一致的:

  1. 发送关系的维护。
  2. 发送关系变更时的通知。

队列高级特性设计

上面都是些消息队列基本功能的实现,下面来看一些关于消息队列特性相关的内容,不管可靠投递/消息丢失与重复以及事务乃至于性能,不是每个消息队列都会照顾到,所以要依照业务的需求,来仔细衡量各种特性实现的成本,利弊,最终做出最为合理的设计。

可靠投递(最终一致性)

这是个激动人心的话题,完全不丢消息,究竟可不可能?答案是,完全可能,前提是消息可能会重复,并且,在异常情况下,要接受消息的延迟。
方案说简单也简单,就是每当要发生不可靠的事情(RPC等)之前,先将消息落地,然后发送。当失败或者不知道成功失败(比如超时)时,消息状态是待发送,定时任务不停轮询所有待发送消息,最终一定可以送达。
具体来说:

  1. producer往broker发送消息之前,需要做一次落地。
  2. 请求到server后,server确保数据落地后再告诉客户端发送成功。
  3. 支持广播的消息队列需要对每个待发送的endpoint,持久化一个发送状态,直到所有endpoint状态都OK才可删除消息。

对于各种不确定(超时、down机、消息没有送达、送达后数据没落地、数据落地了回复没收到),其实对于发送方来说,都是一件事情,就是消息没有送达。
重推消息所面临的问题就是消息重复。重复和丢失就像两个噩梦,你必须要面对一个。好在消息重复还有处理的机会,消息丢失再想找回就难了。
Anyway,作为一个成熟的消息队列,应该尽量在各个环节减少重复投递的可能性,不能因为重复有解决方案就放纵的乱投递。
最后说一句,不是所有的系统都要求最终一致性或者可靠投递,比如一个论坛系统、一个招聘系统。一个重复的简历或话题被发布,可能比丢失了一个发布显得更让用户无法接受。不断重复一句话,任何基础组件要服务于业务场景。

消费确认

当broker把消息投递给消费者后,消费者可以立即响应我收到了这个消息。但收到了这个消息只是第一步,我能不能处理这个消息却不一定。或许因为消费能力的问题,系统的负荷已经不能处理这个消息;或者是刚才状态机里面提到的消息不是我想要接收的消息,主动要求重发。
把消息的送达和消息的处理分开,这样才真正的实现了消息队列的本质-解耦。所以,允许消费者主动进行消费确认是必要的。当然,对于没有特殊逻辑的消息,默认Auto Ack也是可以的,但一定要允许消费方主动ack。
对于正确消费ack的,没什么特殊的。但是对于reject和error,需要特别说明。reject这件事情,往往业务方是无法感知到的,系统的流量和健康状况的评估,以及处理能力的评估是一件非常复杂的事情。举个极端的例子,收到一个消息开始build索引,可能这个消息要处理半个小时,但消息量却是非常的小。所以reject这块建议做成滑动窗口/线程池类似的模型来控制,
消费能力不匹配的时候,直接拒绝,过一段时间重发,减少业务的负担。
但业务出错这件事情是只有业务方自己知道的,就像上文提到的状态机等等。这时应该允许业务方主动ack error,并可以与broker约定下次投递的时间。

重复消息和顺序消息

上文谈到重复消息是不可能100%避免的,除非可以允许丢失,那么,顺序消息能否100%满足呢? 答案是可以,但条件更为苛刻:

  1. 允许消息丢失。
  2. 从发送方到服务方到接受者都是单点单线程。

所以绝对的顺序消息基本上是不能实现的,当然在METAQ/Kafka等pull模型的消息队列中,单线程生产/消费,排除消息丢失,也是一种顺序消息的解决方案。
一般来讲,一个主流消息队列的设计范式里,应该是不丢消息的前提下,尽量减少重复消息,不保证消息的投递顺序。
谈到重复消息,主要是两个话题:

  1. 如何鉴别消息重复,并幂等的处理重复消息。
  2. 一个消息队列如何尽量减少重复消息的投递。

先来看看第一个话题,每一个消息应该有它的唯一身份。不管是业务方自定义的,还是根据IP/PID/时间戳生成的MessageId,如果有地方记录这个MessageId,消息到来是能够进行比对就
能完成重复的鉴定。数据库的唯一键/bloom filter/分布式KV中的key,都是不错的选择。由于消息不能被永久存储,所以理论上都存在消息从持久化存储移除的瞬间上游还在投递的可能(上游因种种原因投递失败,不停重试,都到了下游清理消息的时间)。这种事情都是异常情况下才会发生的,毕竟是小众情况。两分钟消息都还没送达,多送一次又能怎样呢?幂等的处理消息是一门艺术,因为种种原因重复消息或者错乱的消息还是来到了,说两种通用的解决方案:

  1. 版本号。
  2. 状态机。

版本号

举个简单的例子,一个产品的状态有上线/下线状态。如果消息1是下线,消息2是上线。不巧消息1判重失败,被投递了两次,且第二次发生在2之后,如果不做重复性判断,显然最终状态是错误的。
但是,如果每个消息自带一个版本号。上游发送的时候,标记消息1版本号是1,消息2版本号是2。如果再发送下线消息,则版本号标记为3。下游对于每次消息的处理,同时维护一个版本号。
每次只接受比当前版本号大的消息。初始版本为0,当消息1到达时,将版本号更新为1。消息2到来时,因为版本号>1.可以接收,同时更新版本号为2.当另一条下线消息到来时,如果版本号是3.则是真实的下线消息。如果是1,则是重复投递的消息。
如果业务方只关心消息重复不重复,那么问题就已经解决了。但很多时候另一个头疼的问题来了,就是消息顺序如果和想象的顺序不一致。比如应该的顺序是12,到来的顺序是21。则最后会发生状态错误。
参考TCP/IP协议,如果想让乱序的消息最后能够正确的被组织,那么就应该只接收比当前版本号大一的消息。并且在一个session周期内要一直保存各个消息的版本号。
如果到来的顺序是21,则先把2存起来,待2到来后,再处理1,这样重复性和顺序性要求就都达到了。

状态机

基于版本号来处理重复和顺序消息听起来是个不错的主意,但凡事总有瑕疵。使用版本号的最大问题是:

  1. 对发送方必须要求消息带业务版本号。
  2. 下游必须存储消息的版本号,对于要严格保证顺序的。

还不能只存储最新的版本号的消息,要把乱序到来的消息都存储起来。而且必须要对此做出处理。试想一个永不过期的”session”,比如一个物品的状态,会不停流转于上下线。那么中间环节的所有存储
就必须保留,直到在某个版本号之前的版本一个不丢的到来,成本太高。
就刚才的场景看,如果消息没有版本号,该怎么解决呢?业务方只需要自己维护一个状态机,定义各种状态的流转关系。例如,”下线”状态只允许接收”上线”消息,“上线”状态只能接收“下线消息”,如果上线收到上线消息,或者下线收到下线消息,在消息不丢失和上游业务正确的前提下。要么是消息发重了,要么是顺序到达反了。这时消费者只需要把“我不能处理这个消息”告诉投递者,要求投递者过一段时间重发即可。而且重发一定要有次数限制,比如5次,避免死循环,就解决了。
举例子说明,假设产品本身状态是下线,1是上线消息,2是下线消息,3是上线消息,正常情况下,消息应该的到来顺序是123,但实际情况下收到的消息状态变成了3123。
那么下游收到3消息的时候,判断状态机流转是下线->上线,可以接收消息。然后收到消息1,发现是上线->上线,拒绝接收,要求重发。然后收到消息2,状态是上线->下线,于是接收这个消息。
此时无论重发的消息1或者3到来,还是可以接收。另外的重发,在一定次数拒绝后停止重发,业务正确。

中间件对于重复消息的处理

回归到消息队列的话题来讲。上述通用的版本号/状态机/ID判重解决方案里,哪些是消息队列该做的、哪些是消息队列不该做业务方处理的呢?其实这里没有一个完全严格的定义,但回到我们的出发点,我们保证不丢失消息的情况下尽量少重复消息,消费顺序不保证。那么重复消息下和乱序消息下业务的正确,应该是由消费方保证的,我们要做的是减少消息发送的重复。
我们无法定义业务方的业务版本号/状态机,如果API里强制需要指定版本号,则显得过于绑架客户了。况且,在消费方维护这么多状态,就涉及到一个消费方的消息落地/多机间的同步消费状态问题,复杂度指数级上升,而且只能解决部分问题。
减少重复消息的关键步骤:

  1. broker记录MessageId,直到投递成功后清除,重复的ID到来不做处理,这样只要发送者在清除周期内能够感知到消息投递成功,就基本不会在server端产生重复消息。
  2. 对于server投递到consumer的消息,由于不确定对端是在处理过程中还是消息发送丢失的情况下,有必要记录下投递的IP地址。决定重发之前询问这个IP,消息处理成功了吗?如果询问无果,再重发。

事务

持久性是事务的一个特性,然而只满足持久性却不一定能满足事务的特性。还是拿扣钱/加钱的例子讲。满足事务的一致性特征,则必须要么都不进行,要么都能成功。
解决方案从大方向上有两种:

  1. 两阶段提交,分布式事务。
  2. 本地事务,本地落地,补偿发送。

分布式事务存在的最大问题是成本太高,两阶段提交协议,对于仲裁down机或者单点故障,几乎是一个无解的黑洞。对于交易密集型或者I/O密集型的应用,没有办法承受这么高的网络延迟,系统复杂性。
并且成熟的分布式事务一定构建与比较靠谱的商用DB和商用中间件上,成本也太高。
那如何使用本地事务解决分布式事务的问题呢?以本地和业务在一个数据库实例中建表为例子,与扣钱的业务操作同一个事务里,将消息插入本地数据库。如果消息入库失败,则业务回滚;如果消息入库成功,事务提交。
然后发送消息(注意这里可以实时发送,不需要等定时任务检出,以提高消息实时性)。以后的问题就是前文的最终一致性问题所提到的了,只要消息没有发送成功,就一直靠定时任务重试。
这里有一个关键的点,本地事务做的,是业务落地和消息落地的事务,而不是业务落地和RPC成功的事务。这里很多人容易混淆,如果是后者,无疑是事务嵌套RPC,是大忌,会有长事务死锁等各种风险。
而消息只要成功落地,很大程度上就没有丢失的风险(磁盘物理损坏除外)。而消息只要投递到服务端确认后本地才做删除,就完成了producer->broker的可靠投递,并且当消息存储异常时,业务也是可以回滚的。
本地事务存在两个最大的使用障碍:

  1. 配置较为复杂,“绑架”业务方,必须本地数据库实例提供一个库表。
  2. 对于消息延迟高敏感的业务不适用。

话说回来,不是每个业务都需要强事务的。扣钱和加钱需要事务保证,但下单和生成短信却不需要事务,不能因为要求发短信的消息存储投递失败而要求下单业务回滚。所以,一个完整的消息队列应该定义清楚自己可以投递的消息类型,如事务型消息,本地非持久型消息,以及服务端不落地的非可靠消息等。对不同的业务场景做不同的选择。另外事务的使用应该尽量低成本、透明化,可以依托于现有的成熟框架,如Spring的声明式事务做扩展。业务方只需要使用@Transactional标签即可。

性能相关

异步/同步

首先澄清一个概念,异步,同步和oneway是三件事。异步,归根结底你还是需要关心结果的,但可能不是当时的时间点关心,可以用轮询或者回调等方式处理结果;同步是需要当时关心
的结果的;而oneway是发出去就不管死活的方式,这种对于某些完全对可靠性没有要求的场景还是适用的,但不是我们重点讨论的范畴。
回归来看,任何的RPC都是存在客户端异步与服务端异步的,而且是可以任意组合的:客户端同步对服务端异步,客户端异步对服务端异步,客户端同步对服务端同步,客户端异步对服务端同步。
对于客户端来说,同步与异步主要是拿到一个Result,还是Future(Listenable)的区别。实现方式可以是线程池,NIO或者其他事件机制,这里先不展开讲。
服务端异步可能稍微难理解一点,这个是需要RPC协议支持的。参考servlet 3.0规范,服务端可以吐一个future给客户端,并且在future done的时候通知客户端。
整个过程可以参考下面的代码:

客户端同步服务端异步。

Future<Result> future = request(server);//server立刻返回future
synchronized(future){
while(!future.isDone()){
   future.wait();//server处理结束后会notify这个future,并修改isdone标志
}
}
return future.get();

客户端同步服务端同步。

Result result = request(server);

客户端异步服务端同步(这里用线程池的方式)。

Future<Result> future = executor.submit(new Callable(){public void call<Result>(){
    result = request(server);
}})
return future;

客户端异步服务端异步。

Future<Result> future = request(server);//server立刻返回future

return future

上面说了这么多,其实是想让大家脱离两个误区:

  1. RPC只有客户端能做异步,服务端不能。
  2. 异步只能通过线程池。

那么,服务端使用异步最大的好处是什么呢?说到底,是解放了线程和I/O。试想服务端有一堆I/O等待处理,如果每个请求都需要同步响应,每条消息都需要结果立刻返回,那么就几乎没法做I/O合并
(当然接口可以设计成batch的,但可能batch发过来的仍然数量较少)。而如果用异步的方式返回给客户端future,就可以有机会进行I/O的合并,把几个批次发过来的消息一起落地(这种合并对于MySQL等允许batch insert的数据库效果尤其明显),并且彻底释放了线程。不至于说来多少请求开多少线程,能够支持的并发量直线提高。
来看第二个误区,返回future的方式不一定只有线程池。换句话说,可以在线程池里面进行同步操作,也可以进行异步操作,也可以不使用线程池使用异步操作(NIO、事件)。
回到消息队列的议题上,我们当然不希望消息的发送阻塞主流程(前面提到了,server端如果使用异步模型,则可能因消息合并带来一定程度上的消息延迟),所以可以先使用线程池提交一个发送请求,主流程继续往下走。
但是线程池中的请求关心结果吗?Of course,必须等待服务端消息成功落地,才算是消息发送成功。所以这里的模型,准确地说事客户端半同步半异步(使用线程池不阻塞主流程,但线程池中的任务需要等待server端的返回),server端是纯异步。客户端的线程池wait在server端吐回的future上,直到server端处理完毕,才解除阻塞继续进行。
总结一句,同步能够保证结果,异步能够保证效率,要合理的结合才能做到最好的效率。

批量

谈到批量就不得不提生产者消费者模型。但生产者消费者模型中最大的痛点是:消费者到底应该何时进行消费。大处着眼来看,消费动作都是事件驱动的。主要事件包括:

  1. 攒够了一定数量。
  2. 到达了一定时间。
  3. 队列里有新的数据到来。

对于及时性要求高的数据,可用采用方式3来完成,比如客户端向服务端投递数据。只要队列有数据,就把队列中的所有数据刷出,否则将自己挂起,等待新数据的到来。
在第一次把队列数据往外刷的过程中,又积攒了一部分数据,第二次又可以形成一个批量。伪代码如下:

Executor executor = Executors.newFixedThreadPool(4);
final BlockingQueue<Message> queue = new ArrayBlockingQueue<>();
private Runnable task = new Runnable({//这里由于共享队列,Runnable可以复用,故做成全局的
   public void run(){
      List<Message> messages  = new ArrayList<>(20);
      queue.drainTo(messages,20);
      doSend(messages);//阻塞,在这个过程中会有新的消息到来,如果4个线程都占满,队列就有机会囤新的消息
   }
});
public void send(Message message){
    queue.offer(message);
    executor.submit(task)
}

这种方式是消息延迟和批量的一个比较好的平衡,但优先响应低延迟。延迟的最高程度由上一次发送的等待时间决定。但可能造成的问题是发送过快的话批量的大小不够满足性能的极致。

Executor executor = Executors.newFixedThreadPool(4);
final BlockingQueue<Message> queue = new ArrayBlockingQueue<>();
volatile long last = System.currentMills();
Executors.newSingleThreadScheduledExecutor().submit(new Runnable(){
   flush();
},500500,TimeUnits.MILLS);
private Runnable task = new Runnable({//这里由于共享队列,Runnable可以复用,顾做成全局的。
   public void run(){
      List<Message> messages  = new ArrayList<>(20);
      queue.drainTo(messages,20);
      doSend(messages);//阻塞,在这个过程中会有新的消息到来,如果4个线程都占满,队列就有机会屯新的消息。
   }
});
public void send(Message message){
    last = System.currentMills();
    queue.offer(message);
    flush();
}
private void flush(){
 if(queue.size>200||System.currentMills()-last>200){
       executor.submit(task)
  }
}

相反对于可以用适量的延迟来换取高性能的场景来说,用定时/定量二选一的方式可能会更为理想,既到达一定数量才发送,但如果数量一直达不到,也不能干等,有一个时间上限。
具体说来,在上文的submit之前,多判断一个时间和数量,并且Runnable内部维护一个定时器,避免没有新任务到来时旧的任务永远没有机会触发发送条件。对于server端的数据落地,使用这种方式就非常方便。

最后啰嗦几句,曾经有人问我,为什么网络请求小包合并成大包会提高性能?主要原因有两个:

  1. 减少无谓的请求头,如果你每个请求只有几字节,而头却有几十字节,无疑效率非常低下。
  2. 减少回复的ack包个数。把请求合并后,ack包数量必然减少,确认和重发的成本就会降低。

push还是pull

上文提到的消息队列,大多是针对push模型的设计。现在市面上有很多经典的也比较成熟的pull模型的消息队列,如Kafka、MetaQ等。这跟JMS中传统的push方式有很大的区别,可谓另辟蹊径。
我们简要分析下push和pull模型各自存在的利弊。

慢消费

慢消费无疑是push模型最大的致命伤,穿成流水线来看,如果消费者的速度比发送者的速度慢很多,势必造成消息在broker的堆积。假设这些消息都是有用的无法丢弃的,消息就要一直在broker端保存。当然这还不是最致命的,最致命的是broker给consumer推送一堆consumer无法处理的消息,consumer不是reject就是error,然后来回踢皮球。
反观pull模式,consumer可以按需消费,不用担心自己处理不了的消息来骚扰自己,而broker堆积消息也会相对简单,无需记录每一个要发送消息的状态,只需要维护所有消息的队列和偏移量就可以了。所以对于建立索引等慢消费,消息量有限且到来的速度不均匀的情况,pull模式比较合适。

消息延迟与忙等

这是pull模式最大的短板。由于主动权在消费方,消费方无法准确地决定何时去拉取最新的消息。如果一次pull取到消息了还可以继续去pull,如果没有pull取到则需要等待一段时间重新pull。
但等待多久就很难判定了。你可能会说,我可以有xx动态pull取时间调整算法,但问题的本质在于,有没有消息到来这件事情决定权不在消费方。也许1分钟内连续来了1000条消息,然后半个小时没有新消息产生,
可能你的算法算出下次最有可能到来的时间点是31分钟之后,或者60分钟之后,结果下条消息10分钟后到了,是不是很让人沮丧?
当然也不是说延迟就没有解决方案了,业界较成熟的做法是从短时间开始(不会对broker有太大负担),然后指数级增长等待。比如开始等5ms,然后10ms,然后20ms,然后40ms……直到有消息到来,然后再回到5ms。
即使这样,依然存在延迟问题:假设40ms到80ms之间的50ms消息到来,消息就延迟了30ms,而且对于半个小时来一次的消息,这些开销就是白白浪费的。
在阿里的RocketMq里,有一种优化的做法-长轮询,来平衡推拉模型各自的缺点。基本思路是:消费者如果尝试拉取失败,不是直接return,而是把连接挂在那里wait,服务端如果有新的消息到来,把连接notify起来,这也是不错的思路。但海量的长连接block对系统的开销还是不容小觑的,还是要合理的评估时间间隔,给wait加一个时间上限比较好~

顺序消息

如果push模式的消息队列,支持分区,单分区只支持一个消费者消费,并且消费者只有确认一个消息消费后才能push送另外一个消息,还要发送者保证全局顺序唯一,听起来也能做顺序消息,但成本太高了,尤其是必须每个消息消费确认后才能发下一条消息,这对于本身堆积能力和慢消费就是瓶颈的push模式的消息队列,简直是一场灾难。
反观pull模式,如果想做到全局顺序消息,就相对容易很多:

  1. producer对应partition,并且单线程。
  2. consumer对应partition,消费确认(或批量确认),继续消费即可。

所以对于日志push送这种最好全局有序,但允许出现小误差的场景,pull模式非常合适。如果你不想看到通篇乱套的日志~~
Anyway,需要顺序消息的场景还是比较有限的而且成本太高,请慎重考虑。

红包随机算法的故事-阀值

场景:

最近大佬在做红包,看大佬在那想东西,然后抱着有什么问题可以帮下他也能成长自己的心理,问了一下,然后他给我出了一道题,也不告诉我啥业务,~(~ ̄▽ ̄)~ 好吧,我一开始以为简单的问题,后来发现有3个维度,还是花了1个多小时想出来,给大家分享下吧,挺有意思的。

问题 :

条件1:随机 a-b的随机数        例:3-8

条件2:随机n次                      例:50次

条件3 : n次累加之和等于s   例:250

以上 a,b,n,s  都是变量  求每一次随机的值

再次叙述:随机3到8的随机数 50 次 ,使他们累计之和等于250,求50次每一次的随机

想法:

1、实现最简单的就是 写个循环跳出条件为s,也就是s=250就跳出,循环里面写 随机a-b的随机数随机n次 ,也就是随机3-8的随机数,随机50次,然后累加放到s里,这个成功几率太低了,没准就死循环了= =,直接pass掉。

2 、后来想到了用回溯算法,简单说就是随机到最后,相加不得250 ,或者随机到第30个累加就大于250,等条件,反正确定无解之后就进行回溯,删去上一个已经随机的随机数,然后从新随机,后来想了想,不太可行,因为每次的数字是随机的嘛- – 而且回溯一般都是已知的树状求解,如果把3,4,5,6,7,8拿出来求解,那就没随机性了= =所以也pass掉了

3 、后来又想了一个,能不能把250拆分,写个递归逐步求解之类的,后来也没想好怎么拆,所以也没写出来:-D

最终,当想了好多之前学的算法之后,发现问题在于,随机50次随机数很简单,但是累加之和等于一个值,问题就来了,很有可能随机到某一次就无解了,如随机了40次就已经到230了就算最后10个随机的都是最小的3也无解,总和是260,不满足累和等于250,如何做一个控制,让他每一次既是随机的,又能最后相加一定能得250,二百五哈哈哈。。。。

解:

突然脑袋里冒出里,一个绳子拴着一条狗的图像  =  = 灵感啊! 我想到了阀值,这个名字是我取的 = =

首先声明,用这个算法其实要有一定条件的,如必须有解,必须能整除,我们假设条件是允许的。

好思路是这样的,首先用250除以50 等于5,也就是说随机50次,每次是5 ,最后能得到250,废话= = ,但是5就是阀值,这个是不变的。

还有个变量阀值,这个就向绳子一样,打个比方,第一次随机的数是3,那变量阀值就是-2,如果下次随机到7,那么变量阀值就回变成0,让变量阀值在0左右浮动。

如果随机到第49个,变量阀值是-3 ,那么随机数随机到8,也就是把上一次低于阀值5的3个加上这次该随机的5个就是8个,最后相加等于250.

变量阀值是有范围的,范围是3到-2 ,也就是说不出这个范围,当随机到最后一个数的时候,在3到8中的一个值是可以放上去相加能等于250的,当然随机到第49个的时候第50个数已经确认了。

设置一个阀值,每次判定一下随机数是否在这个阀值内,保证最后一个随机数加起来可以等于250。

总感觉没讲懂 =  = ,上代码吧

代码:

@Test

publicvoidtest2()

{

//测试100次

for(intj=0;j<100;j++){

Randomrdm=newRandom();

intbegin=1;

intend=80;

intcycleTimes=1000;

intsum=40000;

//数组放50个数用

intresult[]=newint[cycleTimes];

 

//阀值

intthreshold=sum/cycleTimes;

//最高阀值

inthigh=end-threshold;

intlow=begin-threshold;

//当前阀值

intthresholdNow=0;

 

////记录一下总的循环次数

intfroNo=0;

myJedisService.del("123456789");

for(inti=0;i<cycleTimes-1;i++){

//随机一个数

Integerrandom=rdm.nextInt(end-begin+1)+begin;

//判断是否在阈值内

intthresholdNowFor=random-threshold+thresholdNow;

if(thresholdNowFor<=high&&thresholdNowFor>=low){

thresholdNow=thresholdNowFor;

result[i]=random;

//记录一下每次成功的数字

myJedisService.hincrBy("123456789",random.toString(),1);

}else{

i=i-1;

}

//记录一下总的循环次数

froNo=froNo+1;

}

result[cycleTimes-1]=threshold-thresholdNow;

Integerasfdsdf=threshold-thresholdNow;

myJedisService.hincrBy("123456789",asfdsdf.toString(),1);

//打印用

intaaa=0;

for(inti=0;i<cycleTimes;i++){

aaa=aaa+result[i];

}

 

Map<String,String>justMap=myJedisService.hgetAll("123456789");

Integerxunhuancishu=0;

for(Map.Entry<String,String>entry:justMap.entrySet()){

System.out.println("数字:"+entry.getKey()+",循环次数"+entry.getValue());

xunhuancishu=xunhuancishu+Integer.valueOf(entry.getValue());

}

System.out.println("总数字数量:"+xunhuancishu.toString());

System.out.println("循环次数:"+froNo);

System.out.println("总:"+aaa);

}
}
注:效率超高,50个数一般就循环70次左右就能找到解,并且解有很大的随机性,良好情况下,循环次数n*1.3。

其他:

后来我又想出一个算法,这个算法比上一个随机性大,而且不容易出错,不过应该性能要求会高点,不适合a和b间隔过大,解过多的,那就是第一次用回溯算出所有的解,然后把解存起来,每一次随机一个解,这样的随机性比较大,适合a,b,n,s不变的,但是第一次效率较低,以后的效率较高。

总结:

其实后来想了一下这个算法的限制很多,虽然良好状态下循环次数n*1.3 但是如果结果太过于极端,比如 3-8 循环50 次  累加和为150 有解太极端,而且没有随机性,都是一个数,就不适合用,当阀值越接近( a+b )/2的时候效果最佳!而第二个适合必较固定的,没有一个万能的算法,只有相对的问题,用对应的解决算法,还是看需求,不过思考思考总是好的,而且阀值这个东西感觉也可以用到别的地放。

Thread的中断机制(interrupt) 建议阅读!

中断线程

线程的thread.interrupt()方法是中断线程,将会设置该线程的中断状态位,即设置为true,中断的结果线程是死亡、还是等待新的任务或是继续运行至下一步,就取决于这个程序本身。线程会不时地检测这个中断标示位,以判断线程是否应该被中断(中断标示值是否为true)。它并不像stop方法那样会中断一个正在运行的线程。

判断线程是否被中断

判断某个线程是否已被发送过中断请求,请使用Thread.currentThread().isInterrupted()方法(因为它将线程中断标示位设置为true后,不会立刻清除中断标示位,即不会将中断标设置为false),而不要使用thread.interrupted()(该方法调用后会将中断标示位清除,即重新设置为false)方法来判断,下面是线程在循环中时的中断方式:

while(!Thread.currentThread().isInterrupted() && more work to do){
    do more work
}

如何中断线程

如果一个线程处于了阻塞状态(如线程调用了thread.sleep、thread.join、thread.wait、1.5中的condition.await、以及可中断的通道上的 I/O 操作方法后可进入阻塞状态),则在线程在检查中断标示时如果发现中断标示为true,则会在这些阻塞方法(sleep、join、wait、1.5中的condition.await及可中断的通道上的 I/O 操作方法)调用处抛出InterruptedException异常,并且在抛出异常后立即将线程的中断标示位清除,即重新设置为false。抛出异常是为了线程从阻塞状态醒过来,并在结束线程前让程序员有足够的时间来处理中断请求。

 

注,synchronized在获锁的过程中是不能被中断的,意思是说如果产生了死锁,则不可能被中断(请参考后面的测试例子)。与synchronized功能相似的reentrantLock.lock()方法也是一样,它也不可中断的,即如果发生死锁,那么reentrantLock.lock()方法无法终止,如果调用时被阻塞,则它一直阻塞到它获取到锁为止。但是如果调用带超时的tryLock方法reentrantLock.tryLock(long timeout, TimeUnit unit),那么如果线程在等待时被中断,将抛出一个InterruptedException异常,这是一个非常有用的特性,因为它允许程序打破死锁。你也可以调用reentrantLock.lockInterruptibly()方法,它就相当于一个超时设为无限的tryLock方法。

 

没有任何语言方面的需求一个被中断的线程应该终止。中断一个线程只是为了引起该线程的注意,被中断线程可以决定如何应对中断。某些线程非常重要,以至于它们应该不理会中断,而是在处理完抛出的异常之后继续执行,但是更普遍的情况是,一个线程将把中断看作一个终止请求,这种线程的run方法遵循如下形式:

复制代码
public void run() {
    try {
        ...
        /*
         * 不管循环里是否调用过线程阻塞的方法如sleep、join、wait,这里还是需要加上
         * !Thread.currentThread().isInterrupted()条件,虽然抛出异常后退出了循环,显
         * 得用阻塞的情况下是多余的,但如果调用了阻塞方法但没有阻塞时,这样会更安全、更及时。
         */
        while (!Thread.currentThread().isInterrupted()&& more work to do) {
            do more work 
        }
    } catch (InterruptedException e) {
        //线程在wait或sleep期间被中断了
    } finally {
        //线程结束前做一些清理工作
    }
}
复制代码

上面是while循环在try块里,如果try在while循环里时,因该在catch块里重新设置一下中断标示,因为抛出InterruptedException异常后,中断标示位会自动清除,此时应该这样:

复制代码
public void run() {
    while (!Thread.currentThread().isInterrupted()&& more work to do) {
        try {
            ...
            sleep(delay);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();//重新设置中断标示
        }
    }
}
复制代码

底层中断异常处理方式

另外不要在你的底层代码里捕获InterruptedException异常后不处理,会处理不当,如下:

复制代码
void mySubTask(){
    ...
    try{
        sleep(delay);
    }catch(InterruptedException e){}//不要这样做
    ...
}
复制代码

如果你不知道抛InterruptedException异常后如何处理,那么你有如下好的建议处理方式:
1、在catch子句中,调用Thread.currentThread.interrupt()来设置中断状态(因为抛出异常后中断标示会被清除),让外界通过判断Thread.currentThread().isInterrupted()标示来决定是否终止线程还是继续下去,应该这样做:

复制代码
void mySubTask() {
    ...
    try {
        sleep(delay);
    } catch (InterruptedException e) {
        Thread.currentThread().isInterrupted();
    }
    ...
}
复制代码

2、或者,更好的做法就是,不使用try来捕获这样的异常,让方法直接抛出:

void mySubTask() throws InterruptedException {
    ...
    sleep(delay);
    ...
}

中断应用

使用中断信号量中断非阻塞状态的线程

中断线程最好的,最受推荐的方式是,使用共享变量(shared variable)发出信号,告诉线程必须停止正在运行的任务。线程必须周期性的核查这一变量,然后有秩序地中止任务。Example2描述了这一方式:

复制代码
class Example2 extends Thread {
    volatile boolean stop = false;// 线程中断信号量

    public static void main(String args[]) throws Exception {
        Example2 thread = new Example2();
        System.out.println("Starting thread...");
        thread.start();
        Thread.sleep(3000);
        System.out.println("Asking thread to stop...");
        // 设置中断信号量
        thread.stop = true;
        Thread.sleep(3000);
        System.out.println("Stopping application...");
    }

    public void run() {
        // 每隔一秒检测一下中断信号量
        while (!stop) {
            System.out.println("Thread is running...");
            long time = System.currentTimeMillis();
            /*
             * 使用while循环模拟 sleep 方法,这里不要使用sleep,否则在阻塞时会 抛
             * InterruptedException异常而退出循环,这样while检测stop条件就不会执行,
             * 失去了意义。
             */
            while ((System.currentTimeMillis() - time < 1000)) {}
        }
        System.out.println("Thread exiting under request...");
    }
}
复制代码

使用thread.interrupt()中断非阻塞状态线程

虽然Example2该方法要求一些编码,但并不难实现。同时,它给予线程机会进行必要的清理工作。这里需注意一点的是需将共享变量定义成volatile 类型或将对它的一切访问封入同步的块/方法(synchronized blocks/methods)中。上面是中断一个非阻塞状态的线程的常见做法,但对非检测isInterrupted()条件会更简洁:

复制代码
class Example2 extends Thread {
    public static void main(String args[]) throws Exception {
        Example2 thread = new Example2();
        System.out.println("Starting thread...");
        thread.start();
        Thread.sleep(3000);
        System.out.println("Asking thread to stop...");
        // 发出中断请求
        thread.interrupt();
        Thread.sleep(3000);
        System.out.println("Stopping application...");
    }

    public void run() {
        // 每隔一秒检测是否设置了中断标示
        while (!Thread.currentThread().isInterrupted()) {
            System.out.println("Thread is running...");
            long time = System.currentTimeMillis();
            // 使用while循环模拟 sleep
            while ((System.currentTimeMillis() - time < 1000) ) {
            }
        }
        System.out.println("Thread exiting under request...");
    }
}
复制代码

到目前为止一切顺利!但是,当线程等待某些事件发生而被阻塞,又会发生什么?当然,如果线程被阻塞,它便不能核查共享变量,也就不能停止。这在许多情况下会发生,例如调用Object.wait()、ServerSocket.accept()和DatagramSocket.receive()时,这里仅举出一些。

 

他们都可能永久的阻塞线程。即使发生超时,在超时期满之前持续等待也是不可行和不适当的,所以,要使用某种机制使得线程更早地退出被阻塞的状态。下面就来看一下中断阻塞线程技术。

使用thread.interrupt()中断阻塞状态线程

Thread.interrupt()方法不会中断一个正在运行的线程。这一方法实际上完成的是,设置线程的中断标示位,在线程受到阻塞的地方(如调用sleep、wait、join等地方)抛出一个异常InterruptedException,并且中断状态也将被清除,这样线程就得以退出阻塞的状态。下面是具体实现:

复制代码
class Example3 extends Thread {
    public static void main(String args[]) throws Exception {
        Example3 thread = new Example3();
        System.out.println("Starting thread...");
        thread.start();
        Thread.sleep(3000);
        System.out.println("Asking thread to stop...");
        thread.interrupt();// 等中断信号量设置后再调用
        Thread.sleep(3000);
        System.out.println("Stopping application...");
    }

    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            System.out.println("Thread running...");
            try {
                /*
                 * 如果线程阻塞,将不会去检查中断信号量stop变量,所 以thread.interrupt()
                 * 会使阻塞线程从阻塞的地方抛出异常,让阻塞线程从阻塞状态逃离出来,并
                 * 进行异常块进行 相应的处理
                 */
                Thread.sleep(1000);// 线程阻塞,如果线程收到中断操作信号将抛出异常
            } catch (InterruptedException e) {
                System.out.println("Thread interrupted...");
                /*
                 * 如果线程在调用 Object.wait()方法,或者该类的 join() 、sleep()方法
                 * 过程中受阻,则其中断状态将被清除
                 */
                System.out.println(this.isInterrupted());// false

                //中不中断由自己决定,如果需要真真中断线程,则需要重新设置中断位,如果
                //不需要,则不用调用
                Thread.currentThread().interrupt();
            }
        }
        System.out.println("Thread exiting under request...");
    }
}
复制代码

一旦Example3中的Thread.interrupt()被调用,线程便收到一个异常,于是逃离了阻塞状态并确定应该停止。上面我们还可以使用共享信号量来替换!Thread.currentThread().isInterrupted()条件,但不如它简洁。

死锁状态线程无法被中断

Example4试着去中断处于死锁状态的两个线程,但这两个线都没有收到任何中断信号(抛出异常),所以interrupt()方法是不能中断死锁线程的,因为锁定的位置根本无法抛出异常:

复制代码
class Example4 extends Thread {
    public static void main(String args[]) throws Exception {
        final Object lock1 = new Object();
        final Object lock2 = new Object();
        Thread thread1 = new Thread() {
            public void run() {
                deathLock(lock1, lock2);
            }
        };
        Thread thread2 = new Thread() {
            public void run() {
                // 注意,这里在交换了一下位置
                deathLock(lock2, lock1);
            }
        };
        System.out.println("Starting thread...");
        thread1.start();
        thread2.start();
        Thread.sleep(3000);
        System.out.println("Interrupting thread...");
        thread1.interrupt();
        thread2.interrupt();
        Thread.sleep(3000);
        System.out.println("Stopping application...");
    }

    static void deathLock(Object lock1, Object lock2) {
        try {
            synchronized (lock1) {
                Thread.sleep(10);// 不会在这里死掉
                synchronized (lock2) {// 会锁在这里,虽然阻塞了,但不会抛异常
                    System.out.println(Thread.currentThread());
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }
}
复制代码

中断I/O操作

然而,如果线程在I/O操作进行时被阻塞,又会如何?I/O操作可以阻塞线程一段相当长的时间,特别是牵扯到网络应用时。例如,服务器可能需要等待一个请求(request),又或者,一个网络应用程序可能要等待远端主机的响应。

 

实现此InterruptibleChannel接口的通道是可中断的:如果某个线程在可中断通道上因调用某个阻塞的 I/O 操作(常见的操作一般有这些:serverSocketChannel. accept()、socketChannel.connect、socketChannel.open、socketChannel.read、socketChannel.write、fileChannel.read、fileChannel.write)而进入阻塞状态,而另一个线程又调用了该阻塞线程的 interrupt 方法,这将导致该通道被关闭,并且已阻塞线程接将会收到ClosedByInterruptException,并且设置已阻塞线程的中断状态。另外,如果已设置某个线程的中断状态并且它在通道上调用某个阻塞的 I/O 操作,则该通道将关闭并且该线程立即接收到 ClosedByInterruptException;并仍然设置其中断状态。如果情况是这样,其代码的逻辑和第三个例子中的是一样的,只是异常不同而已。

 

如果你正使用通道(channels)(这是在Java 1.4中引入的新的I/O API),那么被阻塞的线程将收到一个ClosedByInterruptException异常。但是,你可能正使用Java1.0之前就存在的传统的I/O,而且要求更多的工作。既然这样,Thread.interrupt()将不起作用,因为线程将不会退出被阻塞状态。Example5描述了这一行为。尽管interrupt()被调用,线程也不会退出被阻塞状态,比如ServerSocket的accept方法根本不抛出异常。

 

很幸运,Java平台为这种情形提供了一项解决方案,即调用阻塞该线程的套接字的close()方法。在这种情形下,如果线程被I/O操作阻塞,当调用该套接字的close方法时,该线程在调用accept地方法将接收到一个SocketException(SocketException为IOException的子异常)异常,这与使用interrupt()方法引起一个InterruptedException异常被抛出非常相似,(注,如果是流因读写阻塞后,调用流的close方法也会被阻塞,根本不能调用,更不会抛IOExcepiton,此种情况下怎样中断?我想可以转换为通道来操作流可以解决,比如文件通道)。下面是具体实现:

复制代码
class Example6 extends Thread {
    volatile ServerSocket socket;

    public static void main(String args[]) throws Exception {
        Example6 thread = new Example6();
        System.out.println("Starting thread...");
        thread.start();
        Thread.sleep(3000);
        System.out.println("Asking thread to stop...");
        Thread.currentThread().interrupt();// 再调用interrupt方法
        thread.socket.close();// 再调用close方法
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
        }
        System.out.println("Stopping application...");
    }

    public void run() {
        try {
            socket = new ServerSocket(8888);
        } catch (IOException e) {
            System.out.println("Could not create the socket...");
            return;
        }
        while (!Thread.currentThread().isInterrupted()) {
            System.out.println("Waiting for connection...");
            try {
                socket.accept();
            } catch (IOException e) {
                System.out.println("accept() failed or interrupted...");
                Thread.currentThread().interrupt();//重新设置中断标示位
            }
        }
        System.out.println("Thread exiting under request...");
    }
}
复制代码

———————————————————————————————————————————————————

一、没有任何语言方面的需求一个被中断的线程应该终止。中断一个线程只是为了引起该线程的注意,被中断线程可以决定如何应对中断。

二、对于处于sleep,join等操作的线程,如果被调用interrupt()后,会抛出InterruptedException,然后线程的中断标志位会由true重置为false,因为线程为了处理异常已经重新处于就绪状态。

三、不可中断的操作,包括进入synchronized段以及Lock.lock(),inputSteam.read()等,调用interrupt()对于这几个问题无效,因为它们都不抛出中断异常。如果拿不到资源,它们会无限期阻塞下去。

对于Lock.lock(),可以改用Lock.lockInterruptibly(),可被中断的加锁操作,它可以抛出中断异常。等同于等待时间无限长的Lock.tryLock(long time, TimeUnit unit)。

对于inputStream等资源,有些(实现了interruptibleChannel接口)可以通过close()方法将资源关闭,对应的阻塞也会被放开。

首先,看看Thread类里的几个方法:

public static boolean interrupted 测试当前线程是否已经中断。线程的中断状态 由该方法清除。换句话说,如果连续两次调用该方法,则第二次调用将返回 false。

public boolean isInterrupted()

测试线程是否已经中断。线程的中断状态 不受该方法的影响。

public void interrupt()

中断线程。

上面列出了与中断有关的几个方法及其行为,可以看到interrupt是中断线程。如果不了解Java的中断机制,这样的一种解释极容易造成误解,认为调用了线程的interrupt方法就一定会中断线程。

其实,Java的中断是一种协作机制。也就是说调用线程对象的interrupt方法并不一定就中断了正在运行的线程,它只是要求线程自己在合适的时机中断自己。每个线程都有一个boolean的中断状态(这个状态不在Thread的属性上),interrupt方法仅仅只是将该状态置为true。

比如对正常运行的线程调用interrupt()并不能终止他,只是改变了interrupt标示符。

一般说来,如果一个方法声明抛出InterruptedException,表示该方法是可中断的,比如wait,sleep,join,也就是说可中断方法会对interrupt调用做出响应(例如sleep响应interrupt的操作包括清除中断状态,抛出InterruptedException),异常都是由可中断方法自己抛出来的,并不是直接由interrupt方法直接引起的。

Object.wait, Thread.sleep方法,会不断的轮询监听 interrupted 标志位,发现其设置为true后,会停止阻塞并抛出 InterruptedException异常。

————————————————————————————————————————————————————–

看了以上的说明,对java中断的使用肯定是会了,但我想知道的是阻塞了的线程是如何通过interuppt方法完成停止阻塞并抛出interruptedException的,这就要看Thread中native的interuppt0方法了。

第一步学习Java的JNI调用Native方法。

 

第二步下载openjdk的源代码,找到目录结构里的openjdk-src\jdk\src\share\native\java\lang\Thread.c文件。

复制代码
#include "jni.h"
#include "jvm.h"

#include "java_lang_Thread.h"

#define THD "Ljava/lang/Thread;"
#define OBJ "Ljava/lang/Object;"
#define STE "Ljava/lang/StackTraceElement;"

#define ARRAY_LENGTH(a) (sizeof(a)/sizeof(a[0]))

static JNINativeMethod methods[] = {
    {"start0",           "()V",        (void *)&JVM_StartThread},
    {"stop0",            "(" OBJ ")V", (void *)&JVM_StopThread},
    {"isAlive",          "()Z",        (void *)&JVM_IsThreadAlive},
    {"suspend0",         "()V",        (void *)&JVM_SuspendThread},
    {"resume0",          "()V",        (void *)&JVM_ResumeThread},
    {"setPriority0",     "(I)V",       (void *)&JVM_SetThreadPriority},
    {"yield",            "()V",        (void *)&JVM_Yield},
    {"sleep",            "(J)V",       (void *)&JVM_Sleep},
    {"currentThread",    "()" THD,     (void *)&JVM_CurrentThread},
    {"countStackFrames", "()I",        (void *)&JVM_CountStackFrames},
    {"interrupt0",       "()V",        (void *)&JVM_Interrupt},
    {"isInterrupted",    "(Z)Z",       (void *)&JVM_IsInterrupted},
    {"holdsLock",        "(" OBJ ")Z", (void *)&JVM_HoldsLock},
    {"getThreads",        "()[" THD,   (void *)&JVM_GetAllThreads},
    {"dumpThreads",      "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
};

#undef THD
#undef OBJ
#undef STE

JNIEXPORT void JNICALL
Java_java_lang_Thread_registerNatives(JNIEnv *env, jclass cls)
{
    (*env)->RegisterNatives(env, cls, methods, ARRAY_LENGTH(methods));
}

为什么 Java 要把字符串设计成不可变的

String是Java中一个不可变的类,所以他一旦被实例化就无法被修改。不可变类的实例一旦创建,其成员变量的值就不能被修改。不可变类有很多优势。本文总结了为什么字符串被设计成不可变的。将涉及到内存、同步和数据结构相关的知识。

字符串池

字符串池是方法区中的一部分特殊存储。当一个字符串被被创建的时候,首先会去这个字符串池中查找,如果找到,直接返回对该字符串的引用。

下面的代码只会在堆中创建一个字符串

String string1 = “abcd”;
String string2 = “abcd”;

那么在堆里面,只有一个对象”abcd”

如果字符串可变的话,当两个引用指向指向同一个字符串时,对其中一个做修改就会影响另外一个。(请记住该影响,有助于理解后面的内容)

缓存Hashcode

Java中经常会用到字符串的哈希码(hashcode)。例如,在HashMap中,字符串的不可变能保证其hashcode永远保持一致,这样就可以避免一些不必要的麻烦。这也就意味着每次在使用一个字符串的hashcode的时候不用重新计算一次,这样更加高效。

在String类中,有以下代码:

private int hash;//this is used to cache hash code.

以上代码中hash变量中就保存了一个String对象的hashcode,因为String类不可变,所以一旦对象被创建,该hash值也无法改变。所以,每次想要使用该对象的hashcode的时候,直接返回即可。

使其他类的使用更加便利

在介绍这个内容之前,先看以下代码:

HashSet set = new HashSet();
set.add(new String(“a”));
set.add(new String(“b”));
set.add(new String(“c”));

for(String a: set)
a.value = “a”;

在上面的例子中,如果字符串可以被改变,那么以上用法将有可能违反Set的设计原则,因为Set要求其中的元素不可以重复。上面的代码只是为了简单说明该问题,其实String类中并没有value这个字段值。

安全性

String被广泛的使用在其他Java类中充当参数。比如网络连接、打开文件等操作。如果字符串可变,那么类似操作可能导致安全问题。因为某个方法在调用连接操作的时候,他认为会连接到某台机器,但是实际上并没有(其他引用同一String对象的值修改会导致该连接中的字符串内容被修改)。可变的字符串也可能导致反射的安全问题,因为他的参数也是字符串。

代码示例:

boolean connect(string s){
if (!isSecure(s)) {
throw new SecurityException();
}
//如果s在该操作之前被其他的引用所改变,那么就可能导致问题。
causeProblem(s);
}

不可变对象天生就是线程安全的

因为不可变对象不能被改变,所以他们可以自由地在多个线程之间共享。不需要任何同步处理。

总之,String被设计成不可变的主要目的是为了安全和高效。所以,使String是一个不可变类是一个很好的设计。

CAT源码解析

CAT(Central Application Tracking)是一个实时和接近全量的监控系统,它侧重于对Java应用的监控,基本接入了美团点评上海侧所有核心应用。目前在中间件(MVC、RPC、数据库、缓存等)框架中得到广泛应用,为美团点评各业务线提供系统的性能指标、健康状况、监控告警等。自2014年开源以来,除了美团点评之外,CAT还在携程、陆金所、猎聘网、找钢网等多家互联网公司生产环境应用,项目的开源地址是http://github.com/dianping/cat

本文会对CAT整体设计、客户端、服务端等的一些设计思路做详细深入的介绍。

背景介绍

CAT整个产品研发是从2011年底开始的,当时正是大众点评从.NET迁移到Java的核心起步阶段。当初大众点评已经有核心的基础中间件、RPC组件Pigeon、统一配置组件Lion。整体Java迁移已经在服务化的路上。随着服务化的深入,整体Java在线上部署规模逐渐变多,同时,暴露的问题也越来越多。典型的问题有:

  • 大量报错,特别是核心服务,需要花很久时间才能定位。
  • 异常日志都需要线上权限登陆线上机器排查,排错时间长。
  • 有些简单的错误定位都非常困难(一次将线上的库配置到了Beta,花了整个通宵排错)。
  • 很多不了了之的问题怀疑是网络问题(从现在看,内网真的很少出问题)。

虽然那时候也有一些简单的监控工具(比如Zabbix,自己研发的Hawk系统等),可能单个工具在某方面的功能还不错,但整体服务化水平参差不齐、扩展能力相对较弱,监控工具间不能互通互联,使得查找问题根源基本都需要在多个系统之间切换,有时候真的是靠“人品”才能找出根源。

适逢在eBay工作长达十几年的吴其敏加入大众点评成为首席架构师,他对eBay内部应用非常成功的CAL系统有深刻的理解。就在这样天时地利人和的情况下,我们开始研发了大众点评第一代监控系统——CAT。

CAT的原型和理念来源于eBay的CAL系统,最初是吴其敏在大众点评工作期间设计开发的。他之前曾CAT不仅增强了CAL系统核心模型,还添加了更丰富的报表。

整体设计

监控整体要求就是快速发现故障、快速定位故障以及辅助进行程序性能优化。为了做到这些,我们对监控系统的一些非功能做了如下的要求:

  • 实时处理:信息的价值会随时间锐减,尤其是事故处理过程中。
  • 全量数据:最开始的设计目标就是全量采集,全量的好处有很多。
  • 高可用:所有应用都倒下了,需要监控还站着,并告诉工程师发生了什么,做到故障还原和问题定位。
  • 故障容忍:CAT本身故障不应该影响业务正常运转,CAT挂了,应用不该受影响,只是监控能力暂时减弱。
  • 高吞吐:要想还原真相,需要全方位地监控和度量,必须要有超强的处理吞吐能力。
  • 可扩展:支持分布式、跨IDC部署,横向扩展的监控系统。
  • 不保证可靠:允许消息丢失,这是一个很重要的trade-off,目前CAT服务端可以做到4个9的可靠性,可靠系统和不可靠性系统的设计差别非常大。

CAT从开发至今,一直秉承着简单的架构就是最好的架构原则,主要分为三个模块:CAT-client、CAT-consumer、CAT-home。

  • Cat-client 提供给业务以及中间层埋点的底层SDK。
  • Cat-consumer 用于实时分析从客户端提供的数据。
  • Cat-home 作为用户给用户提供展示的控制端。

在实际开发和部署中,Cat-consumer和Cat-home是部署在一个JVM内部,每个CAT服务端都可以作为consumer也可以作为home,这样既能减少整个层级结构,也可以增加系统稳定性。

上图是CAT目前多机房的整体结构图,图中可见:

  • 路由中心是根据应用所在机房信息来决定客户端上报的CAT服务端地址,目前美团点评有广州、北京、上海三地机房。
  • 每个机房内部都有独立的原始信息存储集群HDFS。
  • CAT-home可以部署在一个机房也可以部署在多个机房,在最后做展示的时候,home会从consumer中进行跨机房的调用,将所有的数据合并展示给用户。
  • 实际过程中,consumer、home以及路由中心都是部署在一起的,每个服务端节点都可以充当任何一个角色。
客户端设计

客户端设计是CAT系统设计中最为核心的一个环节,客户端要求是做到API简单、高可靠性能,无论在任何场景下都不能影响客业务性能,监控只是公司核心业务流程一个旁路环节。CAT核心客户端是Java,也支持Net客户端,近期公司内部也在研发其他多语言客户端。以下客户端设计及细节均以Java客户端为模板。

设计架构

CAT客户端在收集端数据方面使用ThreadLocal(线程局部变量),是线程本地变量,也可以称之为线程本地存储。其实ThreadLocal的功用非常简单,就是为每一个使用该变量的线程都提供一个变量值的副本,属于Java中一种较为特殊的线程绑定机制,每一个线程都可以独立地改变自己的副本,不会和其它线程的副本冲突。

在监控场景下,为用户提供服务都是Web容器,比如tomcat或者Jetty,后端的RPC服务端比如Dubbo或者Pigeon,也都是基于线程池来实现的。业务方在处理业务逻辑时基本都是在一个线程内部调用后端服务、数据库、缓存等,将这些数据拿回来再进行业务逻辑封装,最后将结果展示给用户。所以将所有的监控请求作为一个监控上下文存入线程变量就非常合适。

如上图所示,业务执行业务逻辑的时候,就会把此次请求对应的监控存放于线程上下文中,存于上下文的其实是一个监控树的结构。在最后业务线程执行结束时,将监控对象存入一个异步内存队列中,CAT有个消费线程将队列内的数据异步发送到服务端。

API设计

监控API定义往往取决于对监控或者性能分析这个领域的理解,监控和性能分析所针对的场景有如下几种:

  • 一段代码的执行时间,一段代码可以是URL执行耗时,也可以是SQL的执行耗时。
  • 一段代码的执行次数,比如Java抛出异常记录次数,或者一段逻辑的执行次数。
  • 定期执行某段代码,比如定期上报一些核心指标:JVM内存、GC等指标。
  • 关键的业务监控指标,比如监控订单数、交易额、支付成功率等。

在上述领域模型的基础上,CAT设计自己核心的几个监控对象:Transaction、Event、Heartbeat、Metric。

一段监控API的代码示例如下:

序列化和通信

序列化和通信是整个客户端包括服务端性能里面很关键的一个环节。

  • CAT序列化协议是自定义序列化协议,自定义序列化协议相比通用序列化协议要高效很多,这个在大规模数据实时处理场景下还是非常有必要的。
  • CAT通信是基于Netty来实现的NIO的数据传输,Netty是一个非常好的NIO开发框架,在这边就不详细介绍了。

客户端埋点

日志埋点是监控活动的最重要环节之一,日志质量决定着监控质量和效率。当前CAT的埋点目标是以问题为中心,像程序抛出exception就是典型问题。我个人对问题的定义是:不符合预期的就可以算问题,比如请求未完成、响应时间快了慢了、请求TPS多了少了、时间分布不均匀等等。

在互联网环境中,最突出的问题场景,突出的理解是:跨越边界的行为。包括但不限于:

  • HTTP/REST、RPC/SOA、MQ、Job、Cache、DAL;
  • 搜索/查询引擎、业务应用、外包系统、遗留系统;
  • 第三方网关/银行, 合作伙伴/供应商之间;
  • 各类业务指标,如用户登录、订单数、支付状态、销售额。

遇到的问题

通常Java客户端在业务上使用容易出问题的地方就是内存,另外一个是CPU。内存往往是内存泄露,占用内存较多导致业务方GC压力增大; CPU开销最终就是看代码的性能。

以前我们遇到过一个极端的例子,我们一个业务请求做餐饮加商铺的销售额,业务一般会通过for循环所有商铺的分店,结果就造成内存OOM了,后来发现这家店是肯德基,有几万分店,每个循环里面都会有数据库连接。在正常场景下,ThreadLocal内部的监控一个对象就存在几万个节点,导致业务Oldgc特别严重。所以说框架的代码是不能想象业务方会怎么用你的代码,需要考虑到任何情况下都有出问题的可能。

在消耗CPU方面我们也遇到一个case:在某个客户端版本,CAT本地存储当前消息ID自增的大小,客户端使用了MappedByteBuffer这个类,这个类是一个文件内存映射,测试下来这个类的性能非常高,我们仅仅用这个存储了几个字节的对象,正常情况理论上不会有任何问题。在一次线上场景下,很多业务线程都block在这个上面,结果发现当本身这台机器IO存在瓶颈时候,这个也会变得很慢。后来的优化就是把这个IO的操作异步化,所以客户端需要尽可能异步化,异步化序列化、异步化传输、异步化任何可能存在时间延迟的代码操作。

服务端设计

服务端主要的问题是大数据的实时处理,目前后端CAT的计算集群大约35台物理机,存储集群大约35台物理机,每天处理了约100TB的数据量。线上单台机器高峰期大约是110MB/s,接近千兆网打满。

下面我重点讲下CAT服务端一些设计细节。

架构设计

在最初的整体介绍中已经画了架构图,这边介绍下单机的consumer中大概的结构如下:

如上图,CAT服务端在整个实时处理中,基本上实现了全异步化处理。

  • 消息接受是基于Netty的NIO实现。
  • 消息接受到服务端就存放内存队列,然后程序开启一个线程会消费这个消息做消息分发。
  • 每个消息都会有一批线程并发消费各自队列的数据,以做到消息处理的隔离。
  • 消息存储是先存入本地磁盘,然后异步上传到HDFS文件,这也避免了强依赖HDFS。

当某个报表处理器处理来不及时候,比如Transaction报表处理比较慢,可以通过配置支持开启多个Transaction处理线程,并发消费消息。

实时分析

CAT服务端实时报表分析是整个监控系统的核心,CAT重客户端采集的是是原始的logview,目前一天大约有1000亿的消息,这些原始的消息太多了,所以需要在这些消息基础上实现丰富报表,来支持业务问题及性能分析的需要。

CAT是根据日志消息的特点(比如只读特性)和问题场景,量身定做的,它将所有的报表按消息的创建时间,一小时为单位分片,那么每小时就产生一个报表。当前小时报表的所有计算都是基于内存的,用户每次请求即时报表得到的都是最新的实时结果。对于历史报表,因为它是不变的,所以实时不实时也就无所谓了。

CAT基本上所有的报表模型都可以增量计算,它可以分为:计数、计时和关系处理三种。计数又可以分为两类:算术计数和集合计数。典型的算术计数如:总个数(count)、总和(sum)、均值(avg)、最大/最小(max/min)、吞吐(tps)和标准差(std)等,其他都比较直观,标准差稍微复杂一点,大家自己可以推演一下怎么做增量计算。那集合运算,比如95线(表示95%请求的完成时间)、999线(表示99.9%请求的完成时间),则稍微复杂一些,系统开销也更大一点。

报表建模

CAT每个报表往往有多个维度,以transaction报表为例,它有5个维度,分别是应用、机器、Type、Name和分钟级分布情况。如果全维度建模,虽然灵活,但开销将会非常之大。CAT选择固定维度建模,可以理解成将这5个维度组织成深度为5的树,访问时总是从根开始,逐层往下进行。

CAT服务端为每个报表单独分配一个线程,所以不会有锁的问题,所有报表模型都是非线程安全的,其数据是可变的。这样带来的好处是简单且低开销。

CAT报表建模是使用自研的Maven Plugin自动生成的。所有报表是可合并和裁剪的,可以轻易地将2个或多个报表合并成一个报表。在报表处理代码中,CAT大量使用访问者模式(visitor pattern)。

性能分析报表

故障发现报表

  • 实时业务指标监控 :核心业务都会定义自己的业务指标,这不需要太多,主要用于24小时值班监控,实时发现业务指标问题,图中一个是当前的实际值,一个是基准值,就是根据历史趋势计算的预测值。如下图就是当时的情景,能直观看到支付业务出问题的故障。
  • 系统报错大盘。
  • 实时数据库大盘、服务大盘、缓存大盘等。

存储设计

CAT系统的存储主要有两块:

  • CAT的报表的存储。
  • CAT原始logview的存储。

报表是根据logview实时运算出来的给业务分析用的报表,默认报表有小时模式、天模式、周模式以及月模式。CAT实时处理报表都是产生小时级别统计,小时级报表中会带有最低分钟级别粒度的统计。天、周、月等报表都是在小时级别报表合并的结果报表。

原始logview存储一天大约100TB的数据量,因为数据量比较大所以存储必须要要压缩,本身原始logview需要根据Message-ID读取,所以存储整体要求就是批量压缩以及随机读。在当时场景下,并没有特别合适成熟的系统以支持这样的特性,所以我们开发了一种基于文件的存储以支持CAT的场景,在存储上一直是最难的问题,我们一直在这块持续的改进和优化。

消息ID的设计

CAT每个消息都有一个唯一的ID,这个ID在客户端生成,后续都通过这个ID在进行消息内容的查找。典型的RPC消息串起来的问题,比如A调用B的时候,在A这端生成一个Message-ID,在A调用B的过程中,将Message-ID作为调用传递到B端,在B执行过程中,B用context传递的Message-ID作为当前监控消息的Message-ID。

CAT消息的Message-ID格式ShopWeb-0a010680-375030-2,CAT消息一共分为四段:

  • 第一段是应用名shop-web。
  • 第二段是当前这台机器的IP的16进制格式,01010680表示10.1.6.108。
  • 第三段的375030,是系统当前时间除以小时得到的整点数。
  • 第四段的2,是表示当前这个客户端在当前小时的顺序递增号。

存储数据的设计

消息存储是CAT最有挑战的部分。关键问题是消息数量多且大,目前美团点评每天处理消息1000亿左右,大小大约100TB,单物理机高峰期每秒要处理100MB左右的流量。CAT服务端基于此流量做实时计算,还需要将这些数据压缩后写入磁盘。

整体存储结构如下图:

CAT在写数据一份是Index文件,一份是Data文件.

  • Data文件是分段GZIP压缩,每个分段大小小于64K,这样可以用16bits可以表示一个最大分段地址。
  • 一个Message-ID都用需要48bits的大小来存索引,索引根据Message-ID的第四段来确定索引的位置,比如消息Message-ID为ShopWeb-0a010680-375030-2,这条消息ID对应的索引位置为2*48bits的位置。
  • 48bits前面32bits存数据文件的块偏移地址,后面16bits存数据文件解压之后的块内地址偏移。
  • CAT读取消息的时候,首先根据Message-ID的前面三段确定唯一的索引文件,在根据Message-ID第四段确定此Message-ID索引位置,根据索引文件的48bits读取数据文件的内容,然后将数据文件进行GZIP解压,在根据块内便宜地址读取出真正的消息内容。

服务端设计总结

CAT在分布式实时方面,主要归结于以下几点因素:

  • 去中心化,数据分区处理。
  • 基于日志只读特性,以一个小时为时间窗口,实时报表基于内存建模和分析,历史报表通过聚合完成。
  • 基于内存队列,全面异步化、单线程化、无锁设计。
  • 全局消息ID,数据本地化生产,集中式存储。
  • 组件化、服务化理念。

总结感悟

最后我们再花一点点时间来讲一下我们在实践里做的一些东西。

一、MVP版本,Demo版本用了1个月,MVP版本用了3个月。

为什么强调MVP版本?因为做这个项目需要老板和业务的支持。大概在2011年左右,我们整个生产环境估计也有一千台机器(虚拟机),一旦出现问题就到运维那边看日志,看日志的痛苦大家都应该理解,这时候发现一台机器核心服务出错,可能会导致更多的问题。我们就做了MVP版本解决这个问题,当时我们大概做了两个功能:一个是实时知道所有的API接口访问量成功率等;第二是实时能在CAT平台上看到异常日志。这里我想说的是MVP版本不要做太多内容,但是在做一个产品的时候必须从MVP版本做起,要做一些最典型特别亮眼的功能让大家支持你。

二、数据质量。数据质量是整个监控体系里面非常关键,它决定你最后的监控报表质量。所以我们要和跟数据库框架、缓存框架、RPC框架、Web框架等做深入的集成,让业务方便收集以及看到这些数据。

三、单机开发环境,这也是我们认为对整个项目开发效率提升最重要的一点。单机开发环境实际上就是说你在一台机器里可以把你所有的项目都启起来。如果你在一个单机环境下把所有东西启动起来,你就会想方设法地知道我依赖的服务挂了我怎么办?比如CAT依赖了HDFS。单机开发环境除了大幅度提高你的项目开发效率之外,还能提升你整个项目的可靠性。

四、最难的事情是项目上线推动。CAT整个项目大概有两三个人,当时白天都是支持业务上线,培训,晚上才能code,但是一旦随着产品和完善以及业务使用逐渐变多,一些好的产品后面会形成良性循环,推广就会变得比较容易。

五、开放生态。公司越大监控的需求越多,报表需求也更多,比如我们美团点评,产品有很多报表,整个技术体系里面也有很多报表非常多的自定义报表,很多业务方都提各自的需求。最后我们决定把整个CAT系统里面所有的数据都作为API暴露出去,这些需求并不是不能支持,而是这事情根本是做不完的。美团点评内部下游有很多系统依赖CAT的数据,来做进一步的报表展示。

GreenDao教程2

总述:

所有的增删改查都需要通过greendao通过实体对象类生成的Dao来实现,

具体实现如下图

1、初始化数据库操作对象(GreenDao自动生成的操作对象)

2、通过数据库操作对象,进行增删改查操作

Tips

添加的记录需要初始化数据对象里面的数据

可以多次使用where(),进行多次筛选,也可以使用whereOr()语句,进行或语句查找

删除语句一般都是需要先进行一次查询,然后根据查询结果的list进行遍历,进行删除

修改语句一般都是需要先进行一次查询,然后根据查询结果的list进行遍历,对每一个对象进行相应的数据修改,之后再进行修改操作

 

数据库和数据仓库的区别

整理自知乎问答,非原创:数据库 与 数据仓库的本质区别是什么?

举个最常见的例子,拿电商行业来说好了。

基本每家电商公司都会经历,从只需要业务数据库到要数据仓库的阶段。

* 电商早期启动非常容易,入行门槛低。找个外包团队,做了一个可以下单的网页前端 + 几台服务器 + 一个MySQL,就能开门迎客了。这好比手工作坊时期。

* 第二阶段,流量来了,客户和订单都多起来了,普通查询已经有压力了,这个时候就需要升级架构变成多台服务器和多个业务数据库(量大+分库分表),这个阶段的业务数字和指标还可以勉强从业务数据库里查询。初步进入工业化。

* 第三个阶段,一般需要 3-5 年左右的时间,随着业务指数级的增长,数据量的会陡增,公司角色也开始多了起来,开始有了 CEO、CMO、CIO,大家需要面临的问题越来越复杂,越来越深入。高管们关心的问题,从最初非常粗放的:“昨天的收入是多少”、“上个月的 PV、UV 是多少”,逐渐演化到非常精细化和具体的用户的集群分析,特定用户在某种使用场景中,例如“20~30岁女性用户在过去五年的第一季度化妆品类商品的购买行为与公司进行的促销活动方案之间的关系”。

具体分析二者的不同:
* 数据库 OLTP(Online Transaction Processing) ;数据仓库 OLAP(Online Analytical Processing)
* 业务数据库中的数据结构是为了完成交易而设计的,数据仓库是为了查询和分析的便利设计的。
* 数据库通常追求交易的速度,交易完整性,数据的一致性,等等,在数据库模型上主要遵从范式模型(1NF,2NF,3NF,等等),从而尽可能减少数据冗余,保证引用完整性;而数据仓库强调数据分析的效率,复杂查询的速度,数据之间的相关性分析,所以在数据库模型上,数据仓库喜欢使用多维模型,从而提高数据分析的效率。
* 业务数据库面向的是业务人员,而数据仓库面向的是分析人员
* 数据库一般存储在线交易数据,数据实时在变;数据仓库存储的一般是历史数据,固定间隔形成快照