SCCRもう30回目.
まだまだSC_UdpInPortの続き.
SC_ComPort.cpp: 388
void* SC_UdpInPort::Run()
{
OSC_Packet *packet = 0;
while (true) {
if (!packet) {
// preallocate packet before we need it.
packet = (OSC_Packet*)malloc(sizeof(OSC_Packet));
}
packet->mReplyAddr.mSockAddrLen = sizeof(sockaddr_in);
int size = recvfrom(mSocket, mReadBuf, kMaxUDPSize , 0,
(struct sockaddr *) &packet->mReplyAddr.mSockAddr, (socklen_t*)&packet->mReplyAddr.mSockAddrLen);
if (size > 0) {
char *data = (char*)malloc(size);
memcpy(data, mReadBuf, size);
if (mWorld->mDumpOSC) dumpOSC(mWorld->mDumpOSC, size, data);
packet->mReplyAddr.mReplyFunc = udp_reply_func;
packet->mSize = size;
packet->mData = data;
packet->mReplyAddr.mSocket = mSocket;
if (!ProcessOSCPacket(mWorld, packet))
{
scprintf(“command FIFO full\n”);
free(data);
free(packet);
}
packet = 0;
data = 0;
}
}
return 0;
}
OSC_Packetから見ていく.
struct OSC_Packet
{
char *mData;
int32 mSize;
bool mIsBundle;
ReplyAddress mReplyAddr;
};
struct ReplyAddress
{
struct sockaddr_in mSockAddr;
int mSockAddrLen;
int mSocket;
ReplyFunc mReplyFunc;
};
typedef void (*ReplyFunc)(struct ReplyAddress *inReplyAddr, char* inBuf, int inSize);
ぐらいか.
特に難しいもの無し.
OSC_Packetに受信したデータと,送信元のアドレスを保存.
あとはudp_reply_funcがお返し用関数(のポインタ).
SC_ComPort.cpp: 369
void udp_reply_func(struct ReplyAddress *addr, char* msg, int size)
{
int total = sendallto(addr->mSocket, msg, size, (sockaddr*)&addr->mSockAddr, addr->mSockAddrLen);
if (total < size) DumpReplyAddress(addr);
}
sendalltoを呼んでいる.
SC_ComPort.cpp: 614
int sendallto(int socket, const void *msg, size_t len, struct sockaddr *toaddr, int addrlen)
{
int total = 0;
while (total < (int)len)
{
int numbytes = sendto(socket, msg, len – total, 0, toaddr, addrlen);
if (numbytes < 0) {
scprintf(“sendallto errno %d %s\n”, errno, strerror(errno));
return total;
}
total += numbytes;
msg = (void*)((char*)msg + numbytes);
}
return total;
}
sendalltoは,msgがsendtoで全部送れるまでloopで回しているだけ.
ということで,受信したデータの処理が終わったら,udp_reply_funcを呼んでお返ししている様子.
肝はProcessOSCPacketである.
bool ProcessOSCPacket(World *inWorld, OSC_Packet *inPacket)
{
scprintf(“ProcessOSCPacket %d, ‘%s’\n”, inPacket->mSize, inPacket->mData);
if (!inPacket) return false;
bool result;
inWorld->mDriverLock->Lock();
SC_AudioDriver *driver = AudioDriver(inWorld);
if (!driver) return false;
inPacket->mIsBundle = gIsBundle.checkIsBundle((int32*)inPacket->mData);
FifoMsg fifoMsg;
fifoMsg.Set(inWorld, Perform_ToEngine_Msg, FreeOSCPacket, (void*)inPacket);
result = driver->SendOscPacketMsgToEngine(fifoMsg);
inWorld->mDriverLock->Unlock();
return result;
}
mDriverLockはSC_Lockで,Lock()は
SC_Lock.h: 32
void Lock() { pthread_mutex_lock (&mutex); }
はい,またまた出てきました,pthread.
ということで,やはり先にThreadを片づけねばなりません.
NSThreadに頼りっぱなしでpthread_***は使ったことないのである.
そもそもスレッドを使うのはなぜか.
例えば,前々回のrecvfromを使ったコードは,while(1)のループに入ると,他は何もできません.
そこで,whileループを別の流れで処理させておいて,他の仕事をできるようにする必要があって,そのときには
whileループ + 他の仕事
2個の流れがある.このとき,スレッドが2個あるのでマルチスレッドと言う.
だもんで,socketやるときはthreadが関係してくるのが分かる.
で,2個スレッドがあるということは,場合にもよるが,2個の間でデータが同期していないとまずかったりします.
なのでスレッドにはスレッドを作る,同期する仕組みがあるが,これがかなり難しい.
SCに限って言うと,SCがやりたいのは「いつ飛んでくるか分からないOSCを適切に処理したい」である.
この視点でスレッドを攻略していく.
まずはthreadを作る
int pthread_create(pthread_t * __restrict,
const pthread_attr_t * __restrict,
void *(*)(void *),
void * __restrict);
pthread_t型はスレッドID.このIDでスレッドを識別する.
昔の本を読むと,pthread_tはlongの場合が多いと書いてあるが
OSXでは構造体なので注意.
typedef __darwin_pthread_t pthread_t;
typedef struct _opaque_pthread_t *__darwin_pthread_t;
pthread_attr_tは属性を設定する.NULLだとdefault.
次のvoid *(*)(void *)はvoidポインタを返す関数のポインタを渡す.
threadが作られると,この関数が呼ばれる.
返値は,pthread_joinの最後の引数に渡される.
スレッドの終了ステータスとして使うらしい.いろんなコードを見ていると,0を返す場合が多いみたい.
pthread_join
int pthread_join(pthread_t , void **)
pthread_tのスレッドの終了を待つ.
pthread_self
pthread_createで作った関数内で呼ぶとスレッドのID(pthread_t)が取得できる.
pthread_exit
スレッドを終了する.
これぐらいで何か書けそうなので書いてみる.
#include <iostream>
#include <pthread.h>
using namespace std;
void * thread_function(void *arg) {
cout << pthread_self() << endl;
int count = 5;
while(count){
usleep(1000000);
printf(” while %d\n”,count);
count–;
}
pthread_exit( NULL );
}
int main (int argc, const char * argv[]) {
pthread_t pt;
pthread_create( &pt, NULL, &thread_function, (void*)0);
cout << “_pthread_create” << endl;
return 0;
}
はい,何も実行されません.環境によっては一回ぐらいthread_functionが呼ばれることもあるんだろうか.
int main (int argc, const char * argv[]) {
pthread_t pt;
pthread_create( &pt, NULL, &thread_function, (void*)0);
cout << “pthread_join” << endl;
pthread_join(pt, (void **)0);
cout << “_pthread_join” << endl;
return 0;
}
joinするとこうなる.
pthread_join
while 5
while 4
while 3
while 2
while 1
_pthread_join
これはthread_functionが終わるまで,待って次の処理に移っています.
さっきの例は,thread_functionが実行されるまえにmain関数が終わってしまっていたわけです.
じゃ,mainが終わらないように,次はこうやってみよう.
#include <iostream>
#include <pthread.h>
using namespace std;
void * thread_function(void *arg) {
int count = 5;
while(count){
sleep(1);
cout << ” while “ << count << endl;
count–;
}
pthread_exit( NULL );
}
int main (int argc, const char * argv[]) {
pthread_t pt;
pthread_create( &pt, NULL, &thread_function, (void*)0);
cout << “_pthread_create” << endl;
sleep(8);
return 0;
}
_pthread_create
while 5
while 4
while 3
while 2
while 1
こうやると,thread_functionは別で勝手に動いていて処理は次に移っている.(だから_pthread_createがprintされる)
これぐらい押さえたところで,SCに戻る.
SC_ComPort.cpp: 307
で
Start();
が呼ばれている.
親のSC_CmdPort::Start()が呼ばれる.
SC_ComPort.cpp: 255
void SC_CmdPort::Start()
{
pthread_create (&mThread, NULL, com_thread_func, (void*)this);
set_real_time_priority(mThread);
}
でた,pthread_create.
引数にthisである.
呼ばれる関数はこれ.
void* com_thread_func(void* arg)
{
SC_CmdPort *thread = (SC_CmdPort*)arg;
void* result = thread->Run();
return result;
}
thisを(void*)で渡しているので,argはSC_CmdPort.Run()を実行.
void* SC_UdpInPort::Run()
{
OSC_Packet *packet = 0;
while (true) {
なので,ずっとループ.
で,joinだとかはやっていないので,処理は次に進む.
SC_ComPort.cpp: 258
set_real_time_priority(mThread);
#ifdef SC_LINUX
とかはざくざく消しちゃうことにしました.
void set_real_time_priority(pthread_t thread)
{
int policy;
struct sched_param param;
pthread_getschedparam (thread, &policy, ¶m);
policy = SCHED_RR; // round-robin, AKA real-time scheduling
param.sched_priority = 63; // you’ll have to play with this to see what it does
pthread_setschedparam (thread, policy, ¶m);
}
policyとparamを再セットしている.
http://www.linux.or.jp/JM/html/glibc-linuxthreads/man3/pthread_setschedparam.3.html
pthread_setschedparam はスレッド target_thread のスケジューリングパラメータを policy と param で示される値に変更する。 policy は SCHED_OTHER ( 通常の、リアルタイムでないスケジューリング ) 、 SCHED_RR ( ラウンドロビン方式のリアルタイムスケジューリング ) 、 SCHED_FIFO ( 先入れ先出し (FIFO) 方式のリアルタイムスケジューリング ) のいずれかの値をとる。 param は 2 つのリアルタイムポリシーに対する スケジューリング優先度を表す。 スケジューリングポリシーに関するさらなる情報は sched_setpolicy(2) を参照のこと。
SCHED_OTHER/SCHED_RR/SCHED_FIFO
のうち,SCHED_RRを使っています.コメントにもそう書いてある.
しかし,こいつはなんなんだ?
http://www.linux.or.jp/JM/html/LDP_man-pages/man2/sched_setscheduler.2.html
異なるのはそれぞれのプロセスは最大時間単位までしか実行できない ということである。SCHED_RR プロセスが時間単位と同じかそれより 長い時間実行されると、その優先度のリストの最後に置かれる。
ラウンドロビン
一つの資源を順番に利用する手法。
らしい.
sched_priorityは
CHED_OTHER でスケジューリングされているプロセスは静的優先度 として 0 が指定されなければならず、SCHED_FIFO や SCHED_RR でスケジューリングされているプロセスは 1 から 99 の範囲の 静的優先度を取ることができる。
のスケジューリングの値.
99が一番優先度が高いのか?
書いてないので分からない.
どういうコードを書けば優先度のテストができるのかもよくわからん.
が,こういうコードを考えてみた.
#include <iostream>
#include <pthread.h>
using namespace std;
int countA = 0;
int countB = 0;
pthread_t tidA, tidB;
void *doit(void *vptr){
while(1){
if(pthread_equal(tidA, pthread_self())){
countA++;
}else{
countB++;
}
if(countA + countB > 100000000){
break;
}
}
return 0;
}
void set_priority(pthread_t thread, int priority){
int policy;
struct sched_param param;
pthread_getschedparam (thread, &policy, ¶m);
policy = SCHED_RR; // round-robin, AKA real-time scheduling
param.sched_priority = priority; // you’ll have to play with this to see what it does
pthread_setschedparam (thread, policy, ¶m);
}
int main (int argc, const char * argv[]) {
pthread_create(&tidA, NULL,&doit, NULL);
set_priority(tidA, 99);
pthread_create(&tidB, NULL,&doit, NULL);
set_priority(tidB, 1);
pthread_join(tidA,NULL);
pthread_join(tidB,NULL);
cout << “A : B “ << countA << ” : “ << countB << endl;
return 0;
}
Aが99,Bが1の場合
A : B 99595551 : 404450
逆だと
A : B 435625 : 99564376
おおお,かなり差が出ました.
これはすごい.
63ということは結構高い優先度ですが,最高でもないという微妙なところ.
他に優先する処理のための余りなんでしょう.
で,これは実行するたびに回数は変わるので,あくまで優先度であって,絶対的な何かを保証するものではないことに注意.
さて,SCに戻ると,while(true)内では,
recvfromが何かしら受信するまで待っている.
messageが来ると,ProcessOSCPacketが呼ばれる.
で,最初のinWorld->mDriverLock->Lock();
で,pthread_mutex**に遭遇.
次はmutexをやる.