多线程排序

在《系统编程-多线程》中已经了解了多线程的一些特点,其中包括快!那么今天就来看看如何利用多线程来排序。

思路

我们的思路是这样的:

  • 假设有N个线程,则将数组数M据分为N组
  • 每个线程对其中的一组数据使用库函数提供的快速排序算法
  • 所有线程排序完成后,将每组排序好的数组合并

举个例子,使用4个线程对11个数据进行排序:

1
12,10,4,7,9,6,8,1,5,16,11

由于4不能被10整除,因此,前面三个线程,每个负责排序10%(4-1)= 3三个数,最后一个线程负责排序最后两个数。

线程0 线程1 线程2 线程3
12,10,4 7,9,6 8,1,5 16,11

假设这4个线程都完成了自己的工作后,内容如下:

线程0 线程1 线程2 线程3
4,10,12 6,7,9 1,5,8 11,16

最后由主线程将已经排好的每组进行合并:

  • 比较每组的第一个,选出最小的一个,这里是线程2的1,放到新数组的下标0处
  • 将1放到新的数组最开始的位置,线程的下次计较的内容后移,即下次比较时,比较线程2的第二个数。
  • 循环比较

最终可以得到合并的数据:

1
1 4 5 6 7 8 9 10 11 12 16

屏障

通过上面的分析,我们需要多个线程进行排序后,一起交给主线程合并,因此需要有方法等待所有线程完成事情之后,再退出。
在《系统编程-多线程》中介绍了pthread_join,今天我们使用pthread_barrier_wait。

1
2
3
4
5
#include <pthread.h>
int pthread_barrier_destroy(pthread_barrier_t *barrier);
int pthread_barrier_init(pthread_barrier_t *restrict barrier,
const pthread_barrierattr_t *restrict attr, unsigned count);
int pthread_barrier_wait(pthread_barrier_t *barrier);

在解释之前说明一下基本原理,pthread_barrier_wait等待某一个条件达到(计数到达),一旦达到后就会继续往后执行。当然了,如果你希望各个线程完成它自己的工作,主线程再进行合并动作,则你等待的数量可以再加一个。:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//来源:公众号【编程珠玑】
//https://www.yanbinghu.com
//barrier.c
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
pthread_barrier_t b;
void *workThread(void * arg)
{
printf("thread %d\n",*(int*)arg);
pthread_barrier_wait(&b);
return (void*)0;
}
int main(void)
{
int threadNum = 4;
int err;
/*计数为创建线程数+1*/
pthread_barrier_init(&b,NULL,threadNum + 1);
int i = 0;
pthread_t tid;
/*创建多个线程*/
for(i = 0;i < threadNum; i++)
{
err = pthread_create(&tid,NULL,workThread,(void*)&i);
if(0 != err)
{
printf("create thread failed\n");
return -1;
}
printf("tid:%ld\n",tid);
usleep(10000);
}
pthread_barrier_wait(&b);
printf("all thread finished\n");
/*销毁*/
pthread_barrier_destroy(&b);
return 0;
}

其中,pthread_barrier_init用来初始化相关资源,而pthread_barrier_destroy用来销毁相关资源。
编译运行:

1
2
3
4
5
6
7
8
9
10
11
$ gcc -o barrier barrier.c  -lpthread
$ ./barrier
tid:140323085256448
thread 0
tid:140323076863744
thread 1
tid:140323068471040
thread 2
tid:140323060078336
thread 3
all thread finished

比较函数

为了使用qsort函数,我们需要实现自己的比较函数,参考《高级指针话题-函数指针》:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//来源:公众号【编程珠玑】
//https:www.yanbinghu.com
/*比较函数*/
int compare(const void* num1, const void* num2)
{
long l1 = *(long*)num1;
long l2 = *(long*)num2;
if(l1 == l2)
return 0;
else if(l1 < l2)
return -1;
else
return 1;
}

合并

对于每个线程完成它自己的任务之后,需要合并所有内容,关于合并的逻辑前面已经举例了,这里不再多介绍。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//来源:公众号【编程珠玑】
//https://www.yanbinghu.com
/*要排序的数组信息*/
typedef struct SortInfo_t
{
long startIdx; //数组启始下标
long num;//要排序的数量
}SortInfo;
/*合并线程已经排序好的内容*/
void merge(SortInfo *sortInfos,size_t threadNum)
{
long idx[threadNum];
memset(idx,0,threadNum);
long i,minidx,sidx,num;
for(i = 0;i < threadNum;i++)
{
idx[i] = sortInfos[i].startIdx;
}
for(sidx = 0;sidx < NUM;sidx++)
{
num = LONG_MAX;
for(i = 0;i < threadNum;i++)
{
if(idx[i] < (sortInfos[i].startIdx + sortInfos[i].num) && (nums[idx[i]] < num))
{
num = nums[idx[i]];
minidx = i;
}
}
snums[sidx] = nums[idx[minidx]];
idx[minidx]++;
}
}

随机数生成

关于生成方法,参考《随机数生成的N种姿势》。

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
/*来源:公众号【编程珠玑】
https://www.yanbinghu.com
*/
#include <stdio.h>
#include <pthread.h>
#include <limits.h>
#include <sys/time.h>
#include <stdlib.h>
#include <string.h>
#define NUM 8000000L
long nums[NUM];
long snums[NUM];
pthread_barrier_t b;
pthread_mutex_t mutex;
/*比较函数*/
int compare(const void* num1, const void* num2)
{
long l1 = *(long*)num1;
long l2 = *(long*)num2;
if(l1 == l2)
return 0;
else if(l1 < l2)
return -1;
else
return 1;
}
/*要排序的数组信息*/
typedef struct SortInfo_t
{
long startIdx; //数组启始下标
long num;//要排序的数量
}SortInfo;
/*比较线程,采用快速排序*/
void * workThread(void *arg)
{
pthread_mutex_lock(&mutex);
SortInfo *sortInfo = (SortInfo*)arg;
long idx = sortInfo->startIdx;
long num = sortInfo->num;
qsort(&nums[idx],num,sizeof(long),compare);
pthread_mutex_unlock(&mutex);
pthread_barrier_wait(&b);
return ((void*)0);
}
/*合并线程已经排序好的内容*/
void merge(SortInfo *sortInfos,size_t threadNum)
{
long idx[threadNum];
memset(idx,0,threadNum);
long i,minidx,sidx,num;
/*记录索引信息*/
for(i = 0;i < threadNum;i++)
{
idx[i] = sortInfos[i].startIdx;
}
/*遍历各个数据,已经比较过后,比较下标增加*/
for(sidx = 0;sidx < NUM;sidx++)
{
num = LONG_MAX;
for(i = 0;i < threadNum;i++)
{
if(idx[i] < (sortInfos[i].startIdx + sortInfos[i].num) && (nums[idx[i]] < num))
{
num = nums[idx[i]];
minidx = i;
}
}
snums[sidx] = nums[idx[minidx]];
idx[minidx]++;
}
}
int main(int argc,char *argv[])
{
unsigned long i;

/*记录耗费时间*/
struct timeval start,end;
long long startusec,endusec;
double elapsed;


int err;
pthread_t tid;/*线程id*/

long perThreadNum;
long lastThreadNum;


/*获取线程数量,默认为1*/
size_t threadNum = 0;
if(argc > 1)
threadNum = atoi(argv[1]);
if(0 == threadNum)
threadNum = 1;
printf("thread num:%zu\n",threadNum);
SortInfo *sortInfos = malloc(sizeof(SortInfo)*threadNum);
memset(sortInfos,0,sizeof(SortInfo)*threadNum);

/*生成随机数组*/
srandom(time(NULL));
for(i = 0;i < NUM;i++)
{
nums[i] = random();
}

/*如果不能整除,调整最后一个线程处理的数据量*/
long PER_THREAD_NUM = NUM / threadNum;
if(0 != NUM % threadNum)
{
perThreadNum = NUM / (threadNum - 1);
lastThreadNum = NUM % (threadNum - 1);
}
else
{
perThreadNum = PER_THREAD_NUM;
lastThreadNum = PER_THREAD_NUM;
}

gettimeofday(&start,NULL);
pthread_barrier_init(&b,NULL,threadNum + 1);
/*创建线程,并进行排序,传入要排序的部分*/
for(i = 0; i < threadNum;i++)
{
sortInfos[i].startIdx = i*perThreadNum;
sortInfos[i].num = perThreadNum;
if(i == threadNum - 1)
sortInfos[i].num = lastThreadNum;
err = pthread_create(&tid,NULL,workThread,(void*)(&sortInfos[i]));
if(0 != err)
{
printf("create failed\n");
free(sortInfos);
return -1;
}
}
pthread_barrier_wait(&b);
pthread_barrier_destroy(&b);
/*合并*/
merge(&sortInfos[0],threadNum);
gettimeofday(&end,NULL);
startusec = start.tv_sec * 1000000 + start.tv_usec;
endusec = end.tv_sec * 1000000 + end.tv_usec;
elapsed = (double)(endusec - startusec)/1000000.0;
printf("time %f\n",elapsed);
for(i = 0; i < NUM;i++)
{
//printf("%ld\n",snums[i]);
}
free(sortInfos);
return 0;
}

或阅读原文查看。

运行结果

对800W数据进行排序,排序时间:

1
2
3
$ threadSort 1
thread num:1
time 2.369488

使用4个线程时:

1
2
3
$ threadSort 4
thread num:4
time 1.029097

可以看到速度提升是比较明显的。

总结

可以看到使用4线程排序要比单个线程排序快很多,不过以上实现仅供参考,本文例子可能也存在不妥之处,请根据实际数据情况选择合适的排序算法。但是,多线程就一定快吗?敬请关注下一篇。

参考:《unix环境高级编程》

守望 wechat
关注公众号[编程珠玑]获取更多原创技术文章
出入相友,守望相助!