向七牛上传zip包

# -*- coding: utf-8 -*-
# flake8: noqa
from qiniu import Auth
from qiniu import BucketManager
import mysql.connector
sumr = 0
count = 0
real_count = 0
access_key = ‘WR5JtPxdsdYOgladSBDXZl3idmqjBzqkX5E-UDfG’
secret_key = ‘QOsocxMR2BN9eNZek3gjKeP2j_KznEl-H0JOg5g9’
# 初始化Auth状态
q = Auth(access_key, secret_key)
# 初始化BucketManager
bucket = BucketManager(q)
# 你要测试的空间, 并且这个key在你空间中存在
bucket_name = ‘axx-homework’

conn = mysql.connector.connect(user=’aixuexidball’, password=’aixuexi_2016dball’,host=’rm-2zefci7s9f4bpy9gt.mysql.rds.aliyuncs.com’, database=’managesystemv1.0′)
cursor = conn.cursor()
cursor.execute(
“SELECT correct_result FROM c_student_homework_answer_record WHERE correct_result LIKE ‘%http://image1.aixuexi.com%'”)

values = cursor.fetchall()
cursor.close()
conn.close()
values_set = set()
for value in values:
count+=1
values_set.add(value)

for value in values_set:
real_value = value[0].split(‘/’)[-1]
real_count+=1
# print(real_value)
# real_value = value[0].replace(‘http://image1.aixuexi.com/’, ”)
try:
ret, info = bucket.stat(bucket_name, real_value)
if info.status_code == 612:
sumr+=1
print(real_value)
f=open(‘goods.txt’, ‘a’)
f.write(str(real_value+’\n’))
f.close()
except BaseException as e:
print(“111”)
continue
f=open(‘goods.txt’, ‘a’)
f.write(“共计:”+str(count)+’\n’+”真实共计:”+str(real_count)+’\n’+”不存在共计:”+str(sumr))
f.close()
print(“共计:”+str(count))
print(“真实共计:”+str(real_count))
print(“不存在共计:”+str(sumr))

七牛课件重命名

# -*- coding: utf-8 -*-
# flake8: noqa
from qiniu import Auth
from qiniu import BucketManager
import mysql.connector
import time
localtime = time.asctime(time.localtime(time.time()))
current_date = time.strftime(‘%Y-%m-%d’,time.localtime(time.time()))
access_key = ‘WR5JtPxdsdYOgladSBDXZl3idmqjBzqkX5E-UDfG’
secret_key = ‘QOsocxMR2BN9eNZek3gjKeP2j_KznEl-H0JOg5g9’
# 初始化Auth状态
q = Auth(access_key, secret_key)
# 初始化BucketManager
bucket = BucketManager(q)
# 你要测试的空间, 并且这个key在你空间中存在
bucket_name = ‘axx-courseware’

conn = mysql.connector.connect(user=’aixuexidball’, password=’aixuexi_2016dball’,host=’rm-2zefci7s9f4bpy9gt.mysql.rds.aliyuncs.com’, database=’managesystemv1.0′)
cursor = conn.cursor()
cursor.execute(“select DISTINCT(filePath) from coursenote where createTime like ‘” + current_date + “%’ and filePath like ‘kjqn%’;”)

values = cursor.fetchall()
cursor.close()
conn.close()
#print(values)
for val in values:
for real_val in val:
html5_val = real_val.decode()+’/html5.html’
index_val = real_val.decode()+’/index.html’
flash_val = real_val.decode()+’/flash.html’
ret, html5_info = bucket.stat(bucket_name, html5_val)
ret, index_info = bucket.stat(bucket_name, index_val)
ret, flash_info = bucket.stat(bucket_name, flash_val)

if html5_info.status_code != 200 and index_info.status_code == 200 and flash_info.status_code == 200:
ret, info = bucket.move(bucket_name, index_val, bucket_name, html5_val)
if info.status_code == 200:
f=open(‘rename_kejian.log’, ‘a’)
f.write(localtime+” rename:”+str(index_val)+”–>”+str(html5_val)+’\n’)
f.close()
else:
f=open(‘rename_kejian.log’, ‘a’)
f.write(localtime+” faild:”+str(index_val)+”–>”+str(html5_val)+’\n’)
f.write(localtime+” error:”+info+’\n’)
f.close()

ret, info = bucket.move(bucket_name, flash_val, bucket_name, index_val)
if info.status_code == 200:
f=open(‘rename_kejian.log’, ‘a’)
f.write(localtime+” rename:”+str(flash_val)+”–>”+str(index_val)+’\n’)
f.close()
else:
f=open(‘rename_kejian.log’, ‘a’)
f.write(localtime+” faild:”+str(flash_val)+”–>”+str(index_val)+’\n’)
f.write(localtime+” error:”+info+’\n’)
f.close()
elif html5_info.status_code != 200 and index_info.status_code != 200 and flash_info.status_code != 200:
f=open(‘rename_kejian.log’, ‘a’)
f.write(localtime+” lose:”+html5_val+”&”+index_val+”&”+flash_val+’\n’)
f.close()
elif html5_info.status_code != 200 and index_info.status_code == 200 and flash_info.status_code != 200:
f=open(‘rename_kejian.log’, ‘a’)
f.write(localtime+” lose:”+html5_val+”&”+flash_val+’\n’)
f.close()
elif html5_info.status_code != 200 and index_info.status_code != 200 and flash_info.status_code == 200:
f=open(‘rename_kejian.log’, ‘a’)
f.write(localtime+” lose:”+html5_val+”&”+index_val+’\n’)
f.close()

红包随机算法的故事-阀值

场景:

最近大佬在做红包,看大佬在那想东西,然后抱着有什么问题可以帮下他也能成长自己的心理,问了一下,然后他给我出了一道题,也不告诉我啥业务,~(~ ̄▽ ̄)~ 好吧,我一开始以为简单的问题,后来发现有3个维度,还是花了1个多小时想出来,给大家分享下吧,挺有意思的。

问题 :

条件1:随机 a-b的随机数        例:3-8

条件2:随机n次                      例:50次

条件3 : n次累加之和等于s   例:250

以上 a,b,n,s  都是变量  求每一次随机的值

再次叙述:随机3到8的随机数 50 次 ,使他们累计之和等于250,求50次每一次的随机

想法:

1、实现最简单的就是 写个循环跳出条件为s,也就是s=250就跳出,循环里面写 随机a-b的随机数随机n次 ,也就是随机3-8的随机数,随机50次,然后累加放到s里,这个成功几率太低了,没准就死循环了= =,直接pass掉。

2 、后来想到了用回溯算法,简单说就是随机到最后,相加不得250 ,或者随机到第30个累加就大于250,等条件,反正确定无解之后就进行回溯,删去上一个已经随机的随机数,然后从新随机,后来想了想,不太可行,因为每次的数字是随机的嘛- – 而且回溯一般都是已知的树状求解,如果把3,4,5,6,7,8拿出来求解,那就没随机性了= =所以也pass掉了

3 、后来又想了一个,能不能把250拆分,写个递归逐步求解之类的,后来也没想好怎么拆,所以也没写出来:-D

最终,当想了好多之前学的算法之后,发现问题在于,随机50次随机数很简单,但是累加之和等于一个值,问题就来了,很有可能随机到某一次就无解了,如随机了40次就已经到230了就算最后10个随机的都是最小的3也无解,总和是260,不满足累和等于250,如何做一个控制,让他每一次既是随机的,又能最后相加一定能得250,二百五哈哈哈。。。。

解:

突然脑袋里冒出里,一个绳子拴着一条狗的图像  =  = 灵感啊! 我想到了阀值,这个名字是我取的 = =

首先声明,用这个算法其实要有一定条件的,如必须有解,必须能整除,我们假设条件是允许的。

好思路是这样的,首先用250除以50 等于5,也就是说随机50次,每次是5 ,最后能得到250,废话= = ,但是5就是阀值,这个是不变的。

还有个变量阀值,这个就向绳子一样,打个比方,第一次随机的数是3,那变量阀值就是-2,如果下次随机到7,那么变量阀值就回变成0,让变量阀值在0左右浮动。

如果随机到第49个,变量阀值是-3 ,那么随机数随机到8,也就是把上一次低于阀值5的3个加上这次该随机的5个就是8个,最后相加等于250.

变量阀值是有范围的,范围是3到-2 ,也就是说不出这个范围,当随机到最后一个数的时候,在3到8中的一个值是可以放上去相加能等于250的,当然随机到第49个的时候第50个数已经确认了。

设置一个阀值,每次判定一下随机数是否在这个阀值内,保证最后一个随机数加起来可以等于250。

总感觉没讲懂 =  = ,上代码吧

代码:

@Test

publicvoidtest2()

{

//测试100次

for(intj=0;j<100;j++){

Randomrdm=newRandom();

intbegin=1;

intend=80;

intcycleTimes=1000;

intsum=40000;

//数组放50个数用

intresult[]=newint[cycleTimes];

 

//阀值

intthreshold=sum/cycleTimes;

//最高阀值

inthigh=end-threshold;

intlow=begin-threshold;

//当前阀值

intthresholdNow=0;

 

////记录一下总的循环次数

intfroNo=0;

myJedisService.del("123456789");

for(inti=0;i<cycleTimes-1;i++){

//随机一个数

Integerrandom=rdm.nextInt(end-begin+1)+begin;

//判断是否在阈值内

intthresholdNowFor=random-threshold+thresholdNow;

if(thresholdNowFor<=high&&thresholdNowFor>=low){

thresholdNow=thresholdNowFor;

result[i]=random;

//记录一下每次成功的数字

myJedisService.hincrBy("123456789",random.toString(),1);

}else{

i=i-1;

}

//记录一下总的循环次数

froNo=froNo+1;

}

result[cycleTimes-1]=threshold-thresholdNow;

Integerasfdsdf=threshold-thresholdNow;

myJedisService.hincrBy("123456789",asfdsdf.toString(),1);

//打印用

intaaa=0;

for(inti=0;i<cycleTimes;i++){

aaa=aaa+result[i];

}

 

Map<String,String>justMap=myJedisService.hgetAll("123456789");

Integerxunhuancishu=0;

for(Map.Entry<String,String>entry:justMap.entrySet()){

System.out.println("数字:"+entry.getKey()+",循环次数"+entry.getValue());

xunhuancishu=xunhuancishu+Integer.valueOf(entry.getValue());

}

System.out.println("总数字数量:"+xunhuancishu.toString());

System.out.println("循环次数:"+froNo);

System.out.println("总:"+aaa);

}
}
注:效率超高,50个数一般就循环70次左右就能找到解,并且解有很大的随机性,良好情况下,循环次数n*1.3。

其他:

后来我又想出一个算法,这个算法比上一个随机性大,而且不容易出错,不过应该性能要求会高点,不适合a和b间隔过大,解过多的,那就是第一次用回溯算出所有的解,然后把解存起来,每一次随机一个解,这样的随机性比较大,适合a,b,n,s不变的,但是第一次效率较低,以后的效率较高。

总结:

其实后来想了一下这个算法的限制很多,虽然良好状态下循环次数n*1.3 但是如果结果太过于极端,比如 3-8 循环50 次  累加和为150 有解太极端,而且没有随机性,都是一个数,就不适合用,当阀值越接近( a+b )/2的时候效果最佳!而第二个适合必较固定的,没有一个万能的算法,只有相对的问题,用对应的解决算法,还是看需求,不过思考思考总是好的,而且阀值这个东西感觉也可以用到别的地放。

Thread的中断机制(interrupt) 建议阅读!

中断线程

线程的thread.interrupt()方法是中断线程,将会设置该线程的中断状态位,即设置为true,中断的结果线程是死亡、还是等待新的任务或是继续运行至下一步,就取决于这个程序本身。线程会不时地检测这个中断标示位,以判断线程是否应该被中断(中断标示值是否为true)。它并不像stop方法那样会中断一个正在运行的线程。

判断线程是否被中断

判断某个线程是否已被发送过中断请求,请使用Thread.currentThread().isInterrupted()方法(因为它将线程中断标示位设置为true后,不会立刻清除中断标示位,即不会将中断标设置为false),而不要使用thread.interrupted()(该方法调用后会将中断标示位清除,即重新设置为false)方法来判断,下面是线程在循环中时的中断方式:

while(!Thread.currentThread().isInterrupted() && more work to do){
    do more work
}

如何中断线程

如果一个线程处于了阻塞状态(如线程调用了thread.sleep、thread.join、thread.wait、1.5中的condition.await、以及可中断的通道上的 I/O 操作方法后可进入阻塞状态),则在线程在检查中断标示时如果发现中断标示为true,则会在这些阻塞方法(sleep、join、wait、1.5中的condition.await及可中断的通道上的 I/O 操作方法)调用处抛出InterruptedException异常,并且在抛出异常后立即将线程的中断标示位清除,即重新设置为false。抛出异常是为了线程从阻塞状态醒过来,并在结束线程前让程序员有足够的时间来处理中断请求。

 

注,synchronized在获锁的过程中是不能被中断的,意思是说如果产生了死锁,则不可能被中断(请参考后面的测试例子)。与synchronized功能相似的reentrantLock.lock()方法也是一样,它也不可中断的,即如果发生死锁,那么reentrantLock.lock()方法无法终止,如果调用时被阻塞,则它一直阻塞到它获取到锁为止。但是如果调用带超时的tryLock方法reentrantLock.tryLock(long timeout, TimeUnit unit),那么如果线程在等待时被中断,将抛出一个InterruptedException异常,这是一个非常有用的特性,因为它允许程序打破死锁。你也可以调用reentrantLock.lockInterruptibly()方法,它就相当于一个超时设为无限的tryLock方法。

 

没有任何语言方面的需求一个被中断的线程应该终止。中断一个线程只是为了引起该线程的注意,被中断线程可以决定如何应对中断。某些线程非常重要,以至于它们应该不理会中断,而是在处理完抛出的异常之后继续执行,但是更普遍的情况是,一个线程将把中断看作一个终止请求,这种线程的run方法遵循如下形式:

复制代码
public void run() {
    try {
        ...
        /*
         * 不管循环里是否调用过线程阻塞的方法如sleep、join、wait,这里还是需要加上
         * !Thread.currentThread().isInterrupted()条件,虽然抛出异常后退出了循环,显
         * 得用阻塞的情况下是多余的,但如果调用了阻塞方法但没有阻塞时,这样会更安全、更及时。
         */
        while (!Thread.currentThread().isInterrupted()&& more work to do) {
            do more work 
        }
    } catch (InterruptedException e) {
        //线程在wait或sleep期间被中断了
    } finally {
        //线程结束前做一些清理工作
    }
}
复制代码

上面是while循环在try块里,如果try在while循环里时,因该在catch块里重新设置一下中断标示,因为抛出InterruptedException异常后,中断标示位会自动清除,此时应该这样:

复制代码
public void run() {
    while (!Thread.currentThread().isInterrupted()&& more work to do) {
        try {
            ...
            sleep(delay);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();//重新设置中断标示
        }
    }
}
复制代码

底层中断异常处理方式

另外不要在你的底层代码里捕获InterruptedException异常后不处理,会处理不当,如下:

复制代码
void mySubTask(){
    ...
    try{
        sleep(delay);
    }catch(InterruptedException e){}//不要这样做
    ...
}
复制代码

如果你不知道抛InterruptedException异常后如何处理,那么你有如下好的建议处理方式:
1、在catch子句中,调用Thread.currentThread.interrupt()来设置中断状态(因为抛出异常后中断标示会被清除),让外界通过判断Thread.currentThread().isInterrupted()标示来决定是否终止线程还是继续下去,应该这样做:

复制代码
void mySubTask() {
    ...
    try {
        sleep(delay);
    } catch (InterruptedException e) {
        Thread.currentThread().isInterrupted();
    }
    ...
}
复制代码

2、或者,更好的做法就是,不使用try来捕获这样的异常,让方法直接抛出:

void mySubTask() throws InterruptedException {
    ...
    sleep(delay);
    ...
}

中断应用

使用中断信号量中断非阻塞状态的线程

中断线程最好的,最受推荐的方式是,使用共享变量(shared variable)发出信号,告诉线程必须停止正在运行的任务。线程必须周期性的核查这一变量,然后有秩序地中止任务。Example2描述了这一方式:

复制代码
class Example2 extends Thread {
    volatile boolean stop = false;// 线程中断信号量

    public static void main(String args[]) throws Exception {
        Example2 thread = new Example2();
        System.out.println("Starting thread...");
        thread.start();
        Thread.sleep(3000);
        System.out.println("Asking thread to stop...");
        // 设置中断信号量
        thread.stop = true;
        Thread.sleep(3000);
        System.out.println("Stopping application...");
    }

    public void run() {
        // 每隔一秒检测一下中断信号量
        while (!stop) {
            System.out.println("Thread is running...");
            long time = System.currentTimeMillis();
            /*
             * 使用while循环模拟 sleep 方法,这里不要使用sleep,否则在阻塞时会 抛
             * InterruptedException异常而退出循环,这样while检测stop条件就不会执行,
             * 失去了意义。
             */
            while ((System.currentTimeMillis() - time < 1000)) {}
        }
        System.out.println("Thread exiting under request...");
    }
}
复制代码

使用thread.interrupt()中断非阻塞状态线程

虽然Example2该方法要求一些编码,但并不难实现。同时,它给予线程机会进行必要的清理工作。这里需注意一点的是需将共享变量定义成volatile 类型或将对它的一切访问封入同步的块/方法(synchronized blocks/methods)中。上面是中断一个非阻塞状态的线程的常见做法,但对非检测isInterrupted()条件会更简洁:

复制代码
class Example2 extends Thread {
    public static void main(String args[]) throws Exception {
        Example2 thread = new Example2();
        System.out.println("Starting thread...");
        thread.start();
        Thread.sleep(3000);
        System.out.println("Asking thread to stop...");
        // 发出中断请求
        thread.interrupt();
        Thread.sleep(3000);
        System.out.println("Stopping application...");
    }

    public void run() {
        // 每隔一秒检测是否设置了中断标示
        while (!Thread.currentThread().isInterrupted()) {
            System.out.println("Thread is running...");
            long time = System.currentTimeMillis();
            // 使用while循环模拟 sleep
            while ((System.currentTimeMillis() - time < 1000) ) {
            }
        }
        System.out.println("Thread exiting under request...");
    }
}
复制代码

到目前为止一切顺利!但是,当线程等待某些事件发生而被阻塞,又会发生什么?当然,如果线程被阻塞,它便不能核查共享变量,也就不能停止。这在许多情况下会发生,例如调用Object.wait()、ServerSocket.accept()和DatagramSocket.receive()时,这里仅举出一些。

 

他们都可能永久的阻塞线程。即使发生超时,在超时期满之前持续等待也是不可行和不适当的,所以,要使用某种机制使得线程更早地退出被阻塞的状态。下面就来看一下中断阻塞线程技术。

使用thread.interrupt()中断阻塞状态线程

Thread.interrupt()方法不会中断一个正在运行的线程。这一方法实际上完成的是,设置线程的中断标示位,在线程受到阻塞的地方(如调用sleep、wait、join等地方)抛出一个异常InterruptedException,并且中断状态也将被清除,这样线程就得以退出阻塞的状态。下面是具体实现:

复制代码
class Example3 extends Thread {
    public static void main(String args[]) throws Exception {
        Example3 thread = new Example3();
        System.out.println("Starting thread...");
        thread.start();
        Thread.sleep(3000);
        System.out.println("Asking thread to stop...");
        thread.interrupt();// 等中断信号量设置后再调用
        Thread.sleep(3000);
        System.out.println("Stopping application...");
    }

    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            System.out.println("Thread running...");
            try {
                /*
                 * 如果线程阻塞,将不会去检查中断信号量stop变量,所 以thread.interrupt()
                 * 会使阻塞线程从阻塞的地方抛出异常,让阻塞线程从阻塞状态逃离出来,并
                 * 进行异常块进行 相应的处理
                 */
                Thread.sleep(1000);// 线程阻塞,如果线程收到中断操作信号将抛出异常
            } catch (InterruptedException e) {
                System.out.println("Thread interrupted...");
                /*
                 * 如果线程在调用 Object.wait()方法,或者该类的 join() 、sleep()方法
                 * 过程中受阻,则其中断状态将被清除
                 */
                System.out.println(this.isInterrupted());// false

                //中不中断由自己决定,如果需要真真中断线程,则需要重新设置中断位,如果
                //不需要,则不用调用
                Thread.currentThread().interrupt();
            }
        }
        System.out.println("Thread exiting under request...");
    }
}
复制代码

一旦Example3中的Thread.interrupt()被调用,线程便收到一个异常,于是逃离了阻塞状态并确定应该停止。上面我们还可以使用共享信号量来替换!Thread.currentThread().isInterrupted()条件,但不如它简洁。

死锁状态线程无法被中断

Example4试着去中断处于死锁状态的两个线程,但这两个线都没有收到任何中断信号(抛出异常),所以interrupt()方法是不能中断死锁线程的,因为锁定的位置根本无法抛出异常:

复制代码
class Example4 extends Thread {
    public static void main(String args[]) throws Exception {
        final Object lock1 = new Object();
        final Object lock2 = new Object();
        Thread thread1 = new Thread() {
            public void run() {
                deathLock(lock1, lock2);
            }
        };
        Thread thread2 = new Thread() {
            public void run() {
                // 注意,这里在交换了一下位置
                deathLock(lock2, lock1);
            }
        };
        System.out.println("Starting thread...");
        thread1.start();
        thread2.start();
        Thread.sleep(3000);
        System.out.println("Interrupting thread...");
        thread1.interrupt();
        thread2.interrupt();
        Thread.sleep(3000);
        System.out.println("Stopping application...");
    }

    static void deathLock(Object lock1, Object lock2) {
        try {
            synchronized (lock1) {
                Thread.sleep(10);// 不会在这里死掉
                synchronized (lock2) {// 会锁在这里,虽然阻塞了,但不会抛异常
                    System.out.println(Thread.currentThread());
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }
}
复制代码

中断I/O操作

然而,如果线程在I/O操作进行时被阻塞,又会如何?I/O操作可以阻塞线程一段相当长的时间,特别是牵扯到网络应用时。例如,服务器可能需要等待一个请求(request),又或者,一个网络应用程序可能要等待远端主机的响应。

 

实现此InterruptibleChannel接口的通道是可中断的:如果某个线程在可中断通道上因调用某个阻塞的 I/O 操作(常见的操作一般有这些:serverSocketChannel. accept()、socketChannel.connect、socketChannel.open、socketChannel.read、socketChannel.write、fileChannel.read、fileChannel.write)而进入阻塞状态,而另一个线程又调用了该阻塞线程的 interrupt 方法,这将导致该通道被关闭,并且已阻塞线程接将会收到ClosedByInterruptException,并且设置已阻塞线程的中断状态。另外,如果已设置某个线程的中断状态并且它在通道上调用某个阻塞的 I/O 操作,则该通道将关闭并且该线程立即接收到 ClosedByInterruptException;并仍然设置其中断状态。如果情况是这样,其代码的逻辑和第三个例子中的是一样的,只是异常不同而已。

 

如果你正使用通道(channels)(这是在Java 1.4中引入的新的I/O API),那么被阻塞的线程将收到一个ClosedByInterruptException异常。但是,你可能正使用Java1.0之前就存在的传统的I/O,而且要求更多的工作。既然这样,Thread.interrupt()将不起作用,因为线程将不会退出被阻塞状态。Example5描述了这一行为。尽管interrupt()被调用,线程也不会退出被阻塞状态,比如ServerSocket的accept方法根本不抛出异常。

 

很幸运,Java平台为这种情形提供了一项解决方案,即调用阻塞该线程的套接字的close()方法。在这种情形下,如果线程被I/O操作阻塞,当调用该套接字的close方法时,该线程在调用accept地方法将接收到一个SocketException(SocketException为IOException的子异常)异常,这与使用interrupt()方法引起一个InterruptedException异常被抛出非常相似,(注,如果是流因读写阻塞后,调用流的close方法也会被阻塞,根本不能调用,更不会抛IOExcepiton,此种情况下怎样中断?我想可以转换为通道来操作流可以解决,比如文件通道)。下面是具体实现:

复制代码
class Example6 extends Thread {
    volatile ServerSocket socket;

    public static void main(String args[]) throws Exception {
        Example6 thread = new Example6();
        System.out.println("Starting thread...");
        thread.start();
        Thread.sleep(3000);
        System.out.println("Asking thread to stop...");
        Thread.currentThread().interrupt();// 再调用interrupt方法
        thread.socket.close();// 再调用close方法
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
        }
        System.out.println("Stopping application...");
    }

    public void run() {
        try {
            socket = new ServerSocket(8888);
        } catch (IOException e) {
            System.out.println("Could not create the socket...");
            return;
        }
        while (!Thread.currentThread().isInterrupted()) {
            System.out.println("Waiting for connection...");
            try {
                socket.accept();
            } catch (IOException e) {
                System.out.println("accept() failed or interrupted...");
                Thread.currentThread().interrupt();//重新设置中断标示位
            }
        }
        System.out.println("Thread exiting under request...");
    }
}
复制代码

———————————————————————————————————————————————————

一、没有任何语言方面的需求一个被中断的线程应该终止。中断一个线程只是为了引起该线程的注意,被中断线程可以决定如何应对中断。

二、对于处于sleep,join等操作的线程,如果被调用interrupt()后,会抛出InterruptedException,然后线程的中断标志位会由true重置为false,因为线程为了处理异常已经重新处于就绪状态。

三、不可中断的操作,包括进入synchronized段以及Lock.lock(),inputSteam.read()等,调用interrupt()对于这几个问题无效,因为它们都不抛出中断异常。如果拿不到资源,它们会无限期阻塞下去。

对于Lock.lock(),可以改用Lock.lockInterruptibly(),可被中断的加锁操作,它可以抛出中断异常。等同于等待时间无限长的Lock.tryLock(long time, TimeUnit unit)。

对于inputStream等资源,有些(实现了interruptibleChannel接口)可以通过close()方法将资源关闭,对应的阻塞也会被放开。

首先,看看Thread类里的几个方法:

public static boolean interrupted 测试当前线程是否已经中断。线程的中断状态 由该方法清除。换句话说,如果连续两次调用该方法,则第二次调用将返回 false。

public boolean isInterrupted()

测试线程是否已经中断。线程的中断状态 不受该方法的影响。

public void interrupt()

中断线程。

上面列出了与中断有关的几个方法及其行为,可以看到interrupt是中断线程。如果不了解Java的中断机制,这样的一种解释极容易造成误解,认为调用了线程的interrupt方法就一定会中断线程。

其实,Java的中断是一种协作机制。也就是说调用线程对象的interrupt方法并不一定就中断了正在运行的线程,它只是要求线程自己在合适的时机中断自己。每个线程都有一个boolean的中断状态(这个状态不在Thread的属性上),interrupt方法仅仅只是将该状态置为true。

比如对正常运行的线程调用interrupt()并不能终止他,只是改变了interrupt标示符。

一般说来,如果一个方法声明抛出InterruptedException,表示该方法是可中断的,比如wait,sleep,join,也就是说可中断方法会对interrupt调用做出响应(例如sleep响应interrupt的操作包括清除中断状态,抛出InterruptedException),异常都是由可中断方法自己抛出来的,并不是直接由interrupt方法直接引起的。

Object.wait, Thread.sleep方法,会不断的轮询监听 interrupted 标志位,发现其设置为true后,会停止阻塞并抛出 InterruptedException异常。

————————————————————————————————————————————————————–

看了以上的说明,对java中断的使用肯定是会了,但我想知道的是阻塞了的线程是如何通过interuppt方法完成停止阻塞并抛出interruptedException的,这就要看Thread中native的interuppt0方法了。

第一步学习Java的JNI调用Native方法。

 

第二步下载openjdk的源代码,找到目录结构里的openjdk-src\jdk\src\share\native\java\lang\Thread.c文件。

复制代码
#include "jni.h"
#include "jvm.h"

#include "java_lang_Thread.h"

#define THD "Ljava/lang/Thread;"
#define OBJ "Ljava/lang/Object;"
#define STE "Ljava/lang/StackTraceElement;"

#define ARRAY_LENGTH(a) (sizeof(a)/sizeof(a[0]))

static JNINativeMethod methods[] = {
    {"start0",           "()V",        (void *)&JVM_StartThread},
    {"stop0",            "(" OBJ ")V", (void *)&JVM_StopThread},
    {"isAlive",          "()Z",        (void *)&JVM_IsThreadAlive},
    {"suspend0",         "()V",        (void *)&JVM_SuspendThread},
    {"resume0",          "()V",        (void *)&JVM_ResumeThread},
    {"setPriority0",     "(I)V",       (void *)&JVM_SetThreadPriority},
    {"yield",            "()V",        (void *)&JVM_Yield},
    {"sleep",            "(J)V",       (void *)&JVM_Sleep},
    {"currentThread",    "()" THD,     (void *)&JVM_CurrentThread},
    {"countStackFrames", "()I",        (void *)&JVM_CountStackFrames},
    {"interrupt0",       "()V",        (void *)&JVM_Interrupt},
    {"isInterrupted",    "(Z)Z",       (void *)&JVM_IsInterrupted},
    {"holdsLock",        "(" OBJ ")Z", (void *)&JVM_HoldsLock},
    {"getThreads",        "()[" THD,   (void *)&JVM_GetAllThreads},
    {"dumpThreads",      "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
};

#undef THD
#undef OBJ
#undef STE

JNIEXPORT void JNICALL
Java_java_lang_Thread_registerNatives(JNIEnv *env, jclass cls)
{
    (*env)->RegisterNatives(env, cls, methods, ARRAY_LENGTH(methods));
}

为什么 Java 要把字符串设计成不可变的

String是Java中一个不可变的类,所以他一旦被实例化就无法被修改。不可变类的实例一旦创建,其成员变量的值就不能被修改。不可变类有很多优势。本文总结了为什么字符串被设计成不可变的。将涉及到内存、同步和数据结构相关的知识。

字符串池

字符串池是方法区中的一部分特殊存储。当一个字符串被被创建的时候,首先会去这个字符串池中查找,如果找到,直接返回对该字符串的引用。

下面的代码只会在堆中创建一个字符串

String string1 = “abcd”;
String string2 = “abcd”;

那么在堆里面,只有一个对象”abcd”

如果字符串可变的话,当两个引用指向指向同一个字符串时,对其中一个做修改就会影响另外一个。(请记住该影响,有助于理解后面的内容)

缓存Hashcode

Java中经常会用到字符串的哈希码(hashcode)。例如,在HashMap中,字符串的不可变能保证其hashcode永远保持一致,这样就可以避免一些不必要的麻烦。这也就意味着每次在使用一个字符串的hashcode的时候不用重新计算一次,这样更加高效。

在String类中,有以下代码:

private int hash;//this is used to cache hash code.

以上代码中hash变量中就保存了一个String对象的hashcode,因为String类不可变,所以一旦对象被创建,该hash值也无法改变。所以,每次想要使用该对象的hashcode的时候,直接返回即可。

使其他类的使用更加便利

在介绍这个内容之前,先看以下代码:

HashSet set = new HashSet();
set.add(new String(“a”));
set.add(new String(“b”));
set.add(new String(“c”));

for(String a: set)
a.value = “a”;

在上面的例子中,如果字符串可以被改变,那么以上用法将有可能违反Set的设计原则,因为Set要求其中的元素不可以重复。上面的代码只是为了简单说明该问题,其实String类中并没有value这个字段值。

安全性

String被广泛的使用在其他Java类中充当参数。比如网络连接、打开文件等操作。如果字符串可变,那么类似操作可能导致安全问题。因为某个方法在调用连接操作的时候,他认为会连接到某台机器,但是实际上并没有(其他引用同一String对象的值修改会导致该连接中的字符串内容被修改)。可变的字符串也可能导致反射的安全问题,因为他的参数也是字符串。

代码示例:

boolean connect(string s){
if (!isSecure(s)) {
throw new SecurityException();
}
//如果s在该操作之前被其他的引用所改变,那么就可能导致问题。
causeProblem(s);
}

不可变对象天生就是线程安全的

因为不可变对象不能被改变,所以他们可以自由地在多个线程之间共享。不需要任何同步处理。

总之,String被设计成不可变的主要目的是为了安全和高效。所以,使String是一个不可变类是一个很好的设计。

iptables规则

iptables包括配置iptables三个链条的默认规则、添加iptables规则、修改规则、删除规则等。
一、查看规则集
iptables –list -n // 加一个-n以数字形式显示IP和端口,看起来更舒服
二、配置默认规则
iptables -P INPUT DROP // 不允许进
iptables -P FORWARD DROP // 不允许转发
iptables -P OUTPUT ACCEPT // 允许出
三、增加规则
iptables -A INPUT -s 192.168.0.0/24 -j ACCEPT
//允许源IP地址为192.168.0.0/24网段的包流进(包括所有的协议,这里也可以指定单个IP)
iptables -A INPUT -d 192.168.0.22 -j ACCEPT
//允许所有的IP到192.168.0.22的访问
iptables -A INPUT -p tcp –dport 80 -j ACCEPT
//开放本机80端口
iptables -A INPUT -p icmp –icmp-type echo-request -j ACCEPT
//开放本机的ICMP协议
四、删除规则
iptables -D INPUT -s 192.168.0.21 -j ACCEPT
//删除刚才建立的第一条规则
五、规则的保存
iptables -F
//清空规则缓冲区(这个操作会将上面的增加操作全部清空,若须保留建议先执行一下句:保存)
service iptables save
//将规则保存在/etc/sysconfig/iptables文件里
service iptables restart
//重启Iptables服务
最后说明一下,iptables防火墙的配置文件存放于:/etc/sysconfig/iptables

tomcat重启脚本

#!/bin/bash
RED=’\E[1;31m’
GRN=’\E[1;32m’
RES=’\E[0m’
tomcat_name=”tomcat-guanli”
tomcat_log=”/project/opt/${tomcat_name}/logs/catalina.out”
tomcat_start=”/project/opt/${tomcat_name}/bin/startup.sh”
tomcat_pid=`ps aux | grep ${tomcat_name} | grep -v grep | awk ‘{print $2}’`
if [ ! ${tomcat_pid} ];then
echo -e “${RED}tomcat not start…${RES}”
echo -e “${RED}please start tomcat…${RES}”
echo -e “${GRN}start ${tomcat_name}now…${RES}”
sleep 1
sh $tomcat_start ; tail -f $tomcat_log
else
echo -e “${GRN}tomcat pid is ${tomcat_pid} ${RES}”
echo -e “${RED}kill ${tomcat_name}…${RES}”
kill -9 $tomcat_pid
sleep 1
echo -e “${GRN}start ${tomcat_name}…${RES}”
sh $tomcat_start ;tail -f $tomcat_log
fi

mysql5.6自动化安装脚本

大家在使用的时候需要根据实际情况来调整路径变量

#!/bin/bash

#制作人:张阳
#时间:2016/2/25 14:25

#定义颜色变量
RED_COLOR=’\E[1;31m’ #红
GREEN_COLOR=’\E[1;32m’ #绿
YELOW_COLOR=’\E[1;33m’ #黄
BLUE_COLOR=’\E[1;34m’ #蓝
PINK=’\E[1;35m’ #粉红
RES=’\E[0m’

#定义脚本内部变量
SOFT_DIR=/soft
MYSQL_NAME=”mysql-5.6.29.tar.gz”
USER=`cat /etc/passwd | grep ^mysql | wc -l`
MYSQL_INSTALL_DIR=/usr/local/mysql
MY_CNF=`ls /etc/my.cnf |wc -l`
SRC=”/usr/src”

check_environment(){
yum install make gcc-c++ cmake bison-devel ncurses-devel -y

}

check_install(){
if [ -d $SOFT_DIR ];then
cd $SOFT_DIR
if [ -f $MYSQL_NAME ];then
tar xf $MYSQL_NAME -C $SRC
cd $SRC/mysql-5.6.29
cmake \
-DCMAKE_INSTALL_PREFIX=$MYSQL_INSTALL_DIR \
-DMYSQL_DATADIR=$MYSQL_INSTALL_DIR/data \
-DSYSCONFDIR=/etc \
-DWITH_MYISAM_STORAGE_ENGINE=1 \
-DWITH_INNOBASE_STORAGE_ENGINE=1 \
-DWITH_MEMORY_STORAGE_ENGINE=1 \
-DWITH_READLINE=1 \
-DMYSQL_UNIX_ADDR=/var/lib/mysql/mysql.sock \
-DMYSQL_TCP_PORT=3306 \
-DENABLED_LOCAL_INFILE=1 \
-DWITH_PARTITION_STORAGE_ENGINE=1 \
-DEXTRA_CHARSETS=all \
-DDEFAULT_CHARSET=utf8 \
-DDEFAULT_COLLATION=utf8_general_ci
if [ $? -eq 0 ];then
make
if [ $? -eq 0 ];then
make install
if [ $? -eq 0 ];then
echo -e “$GREEN_COLOR 软件已安装完成 $RES”

else
echo -e “$RED_COLOR make install error $RES”
exit 1
fi
else
echo -e “$RED_COLOR make error $RES”
exit 1
fi
else
echo -e “$RED_COLOR configure error $RES”
exit 1
fi
else
echo -e “$RED_COLOR 没有文件,请检查文件或文件名是否正确 $RES”
exit 1
fi
else
echo -e “$RED_COLOR 没有目录 $RES”
exit 1
fi
}

check_user(){
if [ $USER -eq 1 ];then
useradd -g mysql -s /sbin/nologin mysql
else
groupadd mysql
useradd -g mysql -s /sbin/nologin mysql
fi
}

check_perm(){
chown -R mysql.mysql $MYSQL_INSTALL_DIR
}

mysql_configure(){
cd $MYSQL_INSTALL_DIR
scripts/mysql_install_db –basedir=$MYSQL_INSTALL_DIR –datadir=$MYSQL_INSTALL_DIR/data –user=mysql
}

mysql_init(){
cp $MYSQL_INSTALL_DIR/support-files/mysql.server /etc/init.d/mysql
if [ $MY_CNF -eq 0 ];then
cp $MYSQL_INSTALL_DIR/my.cnf /etc/my.cnf
else
rm -rf /etc/my.cnf
cp $MYSQL_INSTALL_DIR/my.cnf /etc/my.cnf
fi
chkconfig mysql on
/etc/init.d/mysql start
if [ $? -eq 0 ];then
echo -e “$GREEN_COLOR mysql安装完成 $RES”
else
echo -e “$RED_COLOR mysql 安装失败,请检查原因 $RES”
fi
}

check_path(){
cat >>/etc/profile <<EOF
export PATH=/usr/local/mysql/bin:$PATH
export PATH=/usr/local/mysql/sbin:$PATH
EOF
sleep 1
source /etc/profile
}

main(){
check_environment
check_install
check_user
check_perm
mysql_configure
mysql_init
check_path
}

echo -ne “$GREEN_COLOR 在安装之前确认是否有${SOFT_DIR}这个目录,是否开始安装:Y/N $RES”

read A

case $A in
Y)
main;;
N)
echo -e “$RED_COLOR 退出安装 $RES”
break;;
*)
echo -e “$RED_COLOR 非法输入,请输入大写的Y/N $RES”
break
esac

Java的Fork/Join任务,你写对了吗?

当我们需要执行大量的小任务时,有经验的Java开发人员都会采用线程池来高效执行这些小任务。然而,有一种任务,例如,对超过1000万个元素的数组进行排序,这种任务本身可以并发执行,但如何拆解成小任务需要在任务执行的过程中动态拆分。这样,大任务可以拆成小任务,小任务还可以继续拆成更小的任务,最后把任务的结果汇总合并,得到最终结果,这种模型就是Fork/Join模型。

Java7引入了Fork/Join框架,我们通过RecursiveTask这个类就可以方便地实现Fork/Join模式。

例如,对一个大数组进行并行求和的RecursiveTask,就可以这样编写:

class SumTask extends RecursiveTask<Long> {

    static final int THRESHOLD = 100;
    long[] array;
    int start;
    int end;

    SumTask(long[] array, int start, int end) {
    this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        if (end - start <= THRESHOLD) {
            // 如果任务足够小,直接计算:
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += array[i];
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
            System.out.println(String.format("compute %d~%d = %d", start, end, sum));
            return sum;
        }
        // 任务太大,一分为二:
        int middle = (end + start) / 2;
        System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end));
        SumTask subtask1 = new SumTask(this.array, start, middle);
        SumTask subtask2 = new SumTask(this.array, middle, end);
        invokeAll(subtask1, subtask2);
        Long subresult1 = subtask1.join();
        Long subresult2 = subtask2.join();
        Long result = subresult1 + subresult2;
        System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result);
        return result;
    }
}

编写这个Fork/Join任务的关键在于,在执行任务的compute()方法内部,先判断任务是不是足够小,如果足够小,就直接计算并返回结果(注意模拟了1秒延时),否则,把自身任务一拆为二,分别计算两个子任务,再返回两个子任务的结果之和。

最后写一个main()方法测试:

public static void main(String[] args) throws Exception {
    // 创建随机数组成的数组:
    long[] array = new long[400];
    fillRandom(array);
    // fork/join task:
    ForkJoinPool fjp = new ForkJoinPool(4); // 最大并发数4
    ForkJoinTask<Long> task = new SumTask(array, 0, array.length);
    long startTime = System.currentTimeMillis();
    Long result = fjp.invoke(task);
    long endTime = System.currentTimeMillis();
    System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms.");
}

关键代码是fjp.invoke(task)来提交一个Fork/Join任务并发执行,然后获得异步执行的结果。

我们设置任务的最小阀值是100,当提交一个400大小的任务时,在4核CPU上执行,会一分为二,再二分为四,每个最小子任务的执行时间是1秒,由于是并发4个子任务执行,整个任务最终执行时间大约为1秒。

新手在编写Fork/Join任务时,往往用搜索引擎搜到一个例子,然后就照着例子写出了下面的代码:

protected Long compute() {
    if (任务足够小?) {
        return computeDirect();
    }
    // 任务太大,一分为二:
    SumTask subtask1 = new SumTask(...);
    SumTask subtask2 = new SumTask(...);
    // 分别对子任务调用fork():
    subtask1.fork();
    subtask2.fork();
    // 合并结果:
    Long subresult1 = subtask1.join();
    Long subresult2 = subtask2.join();
    return subresult1 + subresult2;
}

很遗憾,这种写法是错!误!的!这样写没有正确理解Fork/Join模型的任务执行逻辑。

JDK用来执行Fork/Join任务的工作线程池大小等于CPU核心数。在一个4核CPU上,最多可以同时执行4个子任务。对400个元素的数组求和,执行时间应该为1秒。但是,换成上面的代码,执行时间却是两秒。

这是因为执行compute()方法的线程本身也是一个Worker线程,当对两个子任务调用fork()时,这个Worker线程就会把任务分配给另外两个Worker,但是它自己却停下来等待不干活了!这样就白白浪费了Fork/Join线程池中的一个Worker线程,导致了4个子任务至少需要7个线程才能并发执行。

打个比方,假设一个酒店有400个房间,一共有4名清洁工,每个工人每天可以打扫100个房间,这样,4个工人满负荷工作时,400个房间全部打扫完正好需要1天。

Fork/Join的工作模式就像这样:首先,工人甲被分配了400个房间的任务,他一看任务太多了自己一个人不行,所以先把400个房间拆成两个200,然后叫来乙,把其中一个200分给乙。

紧接着,甲和乙再发现200也是个大任务,于是甲继续把200分成两个100,并把其中一个100分给丙,类似的,乙会把其中一个100分给丁,这样,最终4个人每人分到100个房间,并发执行正好是1天。

如果换一种写法:

// 分别对子任务调用fork():
subtask1.fork();
subtask2.fork();

这个任务就分!错!了!

比如甲把400分成两个200后,这种写法相当于甲把一个200分给乙,把另一个200分给丙,然后,甲成了监工,不干活,等乙和丙干完了他直接汇报工作。乙和丙在把200分拆成两个100的过程中,他俩又成了监工,这样,本来只需要4个工人的活,现在需要7个工人才能1天内完成,其中有3个是不干活的。

其实,我们查看JDK的invokeAll()方法的源码就可以发现,invokeAll的N个任务中,其中N-1个任务会使用fork()交给其它线程执行,但是,它还会留一个任务自己执行,这样,就充分利用了线程池,保证没有空闲的不干活的线程。

CAT源码解析

CAT(Central Application Tracking)是一个实时和接近全量的监控系统,它侧重于对Java应用的监控,基本接入了美团点评上海侧所有核心应用。目前在中间件(MVC、RPC、数据库、缓存等)框架中得到广泛应用,为美团点评各业务线提供系统的性能指标、健康状况、监控告警等。自2014年开源以来,除了美团点评之外,CAT还在携程、陆金所、猎聘网、找钢网等多家互联网公司生产环境应用,项目的开源地址是http://github.com/dianping/cat

本文会对CAT整体设计、客户端、服务端等的一些设计思路做详细深入的介绍。

背景介绍

CAT整个产品研发是从2011年底开始的,当时正是大众点评从.NET迁移到Java的核心起步阶段。当初大众点评已经有核心的基础中间件、RPC组件Pigeon、统一配置组件Lion。整体Java迁移已经在服务化的路上。随着服务化的深入,整体Java在线上部署规模逐渐变多,同时,暴露的问题也越来越多。典型的问题有:

  • 大量报错,特别是核心服务,需要花很久时间才能定位。
  • 异常日志都需要线上权限登陆线上机器排查,排错时间长。
  • 有些简单的错误定位都非常困难(一次将线上的库配置到了Beta,花了整个通宵排错)。
  • 很多不了了之的问题怀疑是网络问题(从现在看,内网真的很少出问题)。

虽然那时候也有一些简单的监控工具(比如Zabbix,自己研发的Hawk系统等),可能单个工具在某方面的功能还不错,但整体服务化水平参差不齐、扩展能力相对较弱,监控工具间不能互通互联,使得查找问题根源基本都需要在多个系统之间切换,有时候真的是靠“人品”才能找出根源。

适逢在eBay工作长达十几年的吴其敏加入大众点评成为首席架构师,他对eBay内部应用非常成功的CAL系统有深刻的理解。就在这样天时地利人和的情况下,我们开始研发了大众点评第一代监控系统——CAT。

CAT的原型和理念来源于eBay的CAL系统,最初是吴其敏在大众点评工作期间设计开发的。他之前曾CAT不仅增强了CAL系统核心模型,还添加了更丰富的报表。

整体设计

监控整体要求就是快速发现故障、快速定位故障以及辅助进行程序性能优化。为了做到这些,我们对监控系统的一些非功能做了如下的要求:

  • 实时处理:信息的价值会随时间锐减,尤其是事故处理过程中。
  • 全量数据:最开始的设计目标就是全量采集,全量的好处有很多。
  • 高可用:所有应用都倒下了,需要监控还站着,并告诉工程师发生了什么,做到故障还原和问题定位。
  • 故障容忍:CAT本身故障不应该影响业务正常运转,CAT挂了,应用不该受影响,只是监控能力暂时减弱。
  • 高吞吐:要想还原真相,需要全方位地监控和度量,必须要有超强的处理吞吐能力。
  • 可扩展:支持分布式、跨IDC部署,横向扩展的监控系统。
  • 不保证可靠:允许消息丢失,这是一个很重要的trade-off,目前CAT服务端可以做到4个9的可靠性,可靠系统和不可靠性系统的设计差别非常大。

CAT从开发至今,一直秉承着简单的架构就是最好的架构原则,主要分为三个模块:CAT-client、CAT-consumer、CAT-home。

  • Cat-client 提供给业务以及中间层埋点的底层SDK。
  • Cat-consumer 用于实时分析从客户端提供的数据。
  • Cat-home 作为用户给用户提供展示的控制端。

在实际开发和部署中,Cat-consumer和Cat-home是部署在一个JVM内部,每个CAT服务端都可以作为consumer也可以作为home,这样既能减少整个层级结构,也可以增加系统稳定性。

上图是CAT目前多机房的整体结构图,图中可见:

  • 路由中心是根据应用所在机房信息来决定客户端上报的CAT服务端地址,目前美团点评有广州、北京、上海三地机房。
  • 每个机房内部都有独立的原始信息存储集群HDFS。
  • CAT-home可以部署在一个机房也可以部署在多个机房,在最后做展示的时候,home会从consumer中进行跨机房的调用,将所有的数据合并展示给用户。
  • 实际过程中,consumer、home以及路由中心都是部署在一起的,每个服务端节点都可以充当任何一个角色。
客户端设计

客户端设计是CAT系统设计中最为核心的一个环节,客户端要求是做到API简单、高可靠性能,无论在任何场景下都不能影响客业务性能,监控只是公司核心业务流程一个旁路环节。CAT核心客户端是Java,也支持Net客户端,近期公司内部也在研发其他多语言客户端。以下客户端设计及细节均以Java客户端为模板。

设计架构

CAT客户端在收集端数据方面使用ThreadLocal(线程局部变量),是线程本地变量,也可以称之为线程本地存储。其实ThreadLocal的功用非常简单,就是为每一个使用该变量的线程都提供一个变量值的副本,属于Java中一种较为特殊的线程绑定机制,每一个线程都可以独立地改变自己的副本,不会和其它线程的副本冲突。

在监控场景下,为用户提供服务都是Web容器,比如tomcat或者Jetty,后端的RPC服务端比如Dubbo或者Pigeon,也都是基于线程池来实现的。业务方在处理业务逻辑时基本都是在一个线程内部调用后端服务、数据库、缓存等,将这些数据拿回来再进行业务逻辑封装,最后将结果展示给用户。所以将所有的监控请求作为一个监控上下文存入线程变量就非常合适。

如上图所示,业务执行业务逻辑的时候,就会把此次请求对应的监控存放于线程上下文中,存于上下文的其实是一个监控树的结构。在最后业务线程执行结束时,将监控对象存入一个异步内存队列中,CAT有个消费线程将队列内的数据异步发送到服务端。

API设计

监控API定义往往取决于对监控或者性能分析这个领域的理解,监控和性能分析所针对的场景有如下几种:

  • 一段代码的执行时间,一段代码可以是URL执行耗时,也可以是SQL的执行耗时。
  • 一段代码的执行次数,比如Java抛出异常记录次数,或者一段逻辑的执行次数。
  • 定期执行某段代码,比如定期上报一些核心指标:JVM内存、GC等指标。
  • 关键的业务监控指标,比如监控订单数、交易额、支付成功率等。

在上述领域模型的基础上,CAT设计自己核心的几个监控对象:Transaction、Event、Heartbeat、Metric。

一段监控API的代码示例如下:

序列化和通信

序列化和通信是整个客户端包括服务端性能里面很关键的一个环节。

  • CAT序列化协议是自定义序列化协议,自定义序列化协议相比通用序列化协议要高效很多,这个在大规模数据实时处理场景下还是非常有必要的。
  • CAT通信是基于Netty来实现的NIO的数据传输,Netty是一个非常好的NIO开发框架,在这边就不详细介绍了。

客户端埋点

日志埋点是监控活动的最重要环节之一,日志质量决定着监控质量和效率。当前CAT的埋点目标是以问题为中心,像程序抛出exception就是典型问题。我个人对问题的定义是:不符合预期的就可以算问题,比如请求未完成、响应时间快了慢了、请求TPS多了少了、时间分布不均匀等等。

在互联网环境中,最突出的问题场景,突出的理解是:跨越边界的行为。包括但不限于:

  • HTTP/REST、RPC/SOA、MQ、Job、Cache、DAL;
  • 搜索/查询引擎、业务应用、外包系统、遗留系统;
  • 第三方网关/银行, 合作伙伴/供应商之间;
  • 各类业务指标,如用户登录、订单数、支付状态、销售额。

遇到的问题

通常Java客户端在业务上使用容易出问题的地方就是内存,另外一个是CPU。内存往往是内存泄露,占用内存较多导致业务方GC压力增大; CPU开销最终就是看代码的性能。

以前我们遇到过一个极端的例子,我们一个业务请求做餐饮加商铺的销售额,业务一般会通过for循环所有商铺的分店,结果就造成内存OOM了,后来发现这家店是肯德基,有几万分店,每个循环里面都会有数据库连接。在正常场景下,ThreadLocal内部的监控一个对象就存在几万个节点,导致业务Oldgc特别严重。所以说框架的代码是不能想象业务方会怎么用你的代码,需要考虑到任何情况下都有出问题的可能。

在消耗CPU方面我们也遇到一个case:在某个客户端版本,CAT本地存储当前消息ID自增的大小,客户端使用了MappedByteBuffer这个类,这个类是一个文件内存映射,测试下来这个类的性能非常高,我们仅仅用这个存储了几个字节的对象,正常情况理论上不会有任何问题。在一次线上场景下,很多业务线程都block在这个上面,结果发现当本身这台机器IO存在瓶颈时候,这个也会变得很慢。后来的优化就是把这个IO的操作异步化,所以客户端需要尽可能异步化,异步化序列化、异步化传输、异步化任何可能存在时间延迟的代码操作。

服务端设计

服务端主要的问题是大数据的实时处理,目前后端CAT的计算集群大约35台物理机,存储集群大约35台物理机,每天处理了约100TB的数据量。线上单台机器高峰期大约是110MB/s,接近千兆网打满。

下面我重点讲下CAT服务端一些设计细节。

架构设计

在最初的整体介绍中已经画了架构图,这边介绍下单机的consumer中大概的结构如下:

如上图,CAT服务端在整个实时处理中,基本上实现了全异步化处理。

  • 消息接受是基于Netty的NIO实现。
  • 消息接受到服务端就存放内存队列,然后程序开启一个线程会消费这个消息做消息分发。
  • 每个消息都会有一批线程并发消费各自队列的数据,以做到消息处理的隔离。
  • 消息存储是先存入本地磁盘,然后异步上传到HDFS文件,这也避免了强依赖HDFS。

当某个报表处理器处理来不及时候,比如Transaction报表处理比较慢,可以通过配置支持开启多个Transaction处理线程,并发消费消息。

实时分析

CAT服务端实时报表分析是整个监控系统的核心,CAT重客户端采集的是是原始的logview,目前一天大约有1000亿的消息,这些原始的消息太多了,所以需要在这些消息基础上实现丰富报表,来支持业务问题及性能分析的需要。

CAT是根据日志消息的特点(比如只读特性)和问题场景,量身定做的,它将所有的报表按消息的创建时间,一小时为单位分片,那么每小时就产生一个报表。当前小时报表的所有计算都是基于内存的,用户每次请求即时报表得到的都是最新的实时结果。对于历史报表,因为它是不变的,所以实时不实时也就无所谓了。

CAT基本上所有的报表模型都可以增量计算,它可以分为:计数、计时和关系处理三种。计数又可以分为两类:算术计数和集合计数。典型的算术计数如:总个数(count)、总和(sum)、均值(avg)、最大/最小(max/min)、吞吐(tps)和标准差(std)等,其他都比较直观,标准差稍微复杂一点,大家自己可以推演一下怎么做增量计算。那集合运算,比如95线(表示95%请求的完成时间)、999线(表示99.9%请求的完成时间),则稍微复杂一些,系统开销也更大一点。

报表建模

CAT每个报表往往有多个维度,以transaction报表为例,它有5个维度,分别是应用、机器、Type、Name和分钟级分布情况。如果全维度建模,虽然灵活,但开销将会非常之大。CAT选择固定维度建模,可以理解成将这5个维度组织成深度为5的树,访问时总是从根开始,逐层往下进行。

CAT服务端为每个报表单独分配一个线程,所以不会有锁的问题,所有报表模型都是非线程安全的,其数据是可变的。这样带来的好处是简单且低开销。

CAT报表建模是使用自研的Maven Plugin自动生成的。所有报表是可合并和裁剪的,可以轻易地将2个或多个报表合并成一个报表。在报表处理代码中,CAT大量使用访问者模式(visitor pattern)。

性能分析报表

故障发现报表

  • 实时业务指标监控 :核心业务都会定义自己的业务指标,这不需要太多,主要用于24小时值班监控,实时发现业务指标问题,图中一个是当前的实际值,一个是基准值,就是根据历史趋势计算的预测值。如下图就是当时的情景,能直观看到支付业务出问题的故障。
  • 系统报错大盘。
  • 实时数据库大盘、服务大盘、缓存大盘等。

存储设计

CAT系统的存储主要有两块:

  • CAT的报表的存储。
  • CAT原始logview的存储。

报表是根据logview实时运算出来的给业务分析用的报表,默认报表有小时模式、天模式、周模式以及月模式。CAT实时处理报表都是产生小时级别统计,小时级报表中会带有最低分钟级别粒度的统计。天、周、月等报表都是在小时级别报表合并的结果报表。

原始logview存储一天大约100TB的数据量,因为数据量比较大所以存储必须要要压缩,本身原始logview需要根据Message-ID读取,所以存储整体要求就是批量压缩以及随机读。在当时场景下,并没有特别合适成熟的系统以支持这样的特性,所以我们开发了一种基于文件的存储以支持CAT的场景,在存储上一直是最难的问题,我们一直在这块持续的改进和优化。

消息ID的设计

CAT每个消息都有一个唯一的ID,这个ID在客户端生成,后续都通过这个ID在进行消息内容的查找。典型的RPC消息串起来的问题,比如A调用B的时候,在A这端生成一个Message-ID,在A调用B的过程中,将Message-ID作为调用传递到B端,在B执行过程中,B用context传递的Message-ID作为当前监控消息的Message-ID。

CAT消息的Message-ID格式ShopWeb-0a010680-375030-2,CAT消息一共分为四段:

  • 第一段是应用名shop-web。
  • 第二段是当前这台机器的IP的16进制格式,01010680表示10.1.6.108。
  • 第三段的375030,是系统当前时间除以小时得到的整点数。
  • 第四段的2,是表示当前这个客户端在当前小时的顺序递增号。

存储数据的设计

消息存储是CAT最有挑战的部分。关键问题是消息数量多且大,目前美团点评每天处理消息1000亿左右,大小大约100TB,单物理机高峰期每秒要处理100MB左右的流量。CAT服务端基于此流量做实时计算,还需要将这些数据压缩后写入磁盘。

整体存储结构如下图:

CAT在写数据一份是Index文件,一份是Data文件.

  • Data文件是分段GZIP压缩,每个分段大小小于64K,这样可以用16bits可以表示一个最大分段地址。
  • 一个Message-ID都用需要48bits的大小来存索引,索引根据Message-ID的第四段来确定索引的位置,比如消息Message-ID为ShopWeb-0a010680-375030-2,这条消息ID对应的索引位置为2*48bits的位置。
  • 48bits前面32bits存数据文件的块偏移地址,后面16bits存数据文件解压之后的块内地址偏移。
  • CAT读取消息的时候,首先根据Message-ID的前面三段确定唯一的索引文件,在根据Message-ID第四段确定此Message-ID索引位置,根据索引文件的48bits读取数据文件的内容,然后将数据文件进行GZIP解压,在根据块内便宜地址读取出真正的消息内容。

服务端设计总结

CAT在分布式实时方面,主要归结于以下几点因素:

  • 去中心化,数据分区处理。
  • 基于日志只读特性,以一个小时为时间窗口,实时报表基于内存建模和分析,历史报表通过聚合完成。
  • 基于内存队列,全面异步化、单线程化、无锁设计。
  • 全局消息ID,数据本地化生产,集中式存储。
  • 组件化、服务化理念。

总结感悟

最后我们再花一点点时间来讲一下我们在实践里做的一些东西。

一、MVP版本,Demo版本用了1个月,MVP版本用了3个月。

为什么强调MVP版本?因为做这个项目需要老板和业务的支持。大概在2011年左右,我们整个生产环境估计也有一千台机器(虚拟机),一旦出现问题就到运维那边看日志,看日志的痛苦大家都应该理解,这时候发现一台机器核心服务出错,可能会导致更多的问题。我们就做了MVP版本解决这个问题,当时我们大概做了两个功能:一个是实时知道所有的API接口访问量成功率等;第二是实时能在CAT平台上看到异常日志。这里我想说的是MVP版本不要做太多内容,但是在做一个产品的时候必须从MVP版本做起,要做一些最典型特别亮眼的功能让大家支持你。

二、数据质量。数据质量是整个监控体系里面非常关键,它决定你最后的监控报表质量。所以我们要和跟数据库框架、缓存框架、RPC框架、Web框架等做深入的集成,让业务方便收集以及看到这些数据。

三、单机开发环境,这也是我们认为对整个项目开发效率提升最重要的一点。单机开发环境实际上就是说你在一台机器里可以把你所有的项目都启起来。如果你在一个单机环境下把所有东西启动起来,你就会想方设法地知道我依赖的服务挂了我怎么办?比如CAT依赖了HDFS。单机开发环境除了大幅度提高你的项目开发效率之外,还能提升你整个项目的可靠性。

四、最难的事情是项目上线推动。CAT整个项目大概有两三个人,当时白天都是支持业务上线,培训,晚上才能code,但是一旦随着产品和完善以及业务使用逐渐变多,一些好的产品后面会形成良性循环,推广就会变得比较容易。

五、开放生态。公司越大监控的需求越多,报表需求也更多,比如我们美团点评,产品有很多报表,整个技术体系里面也有很多报表非常多的自定义报表,很多业务方都提各自的需求。最后我们决定把整个CAT系统里面所有的数据都作为API暴露出去,这些需求并不是不能支持,而是这事情根本是做不完的。美团点评内部下游有很多系统依赖CAT的数据,来做进一步的报表展示。