Neden gerekli?
Ölçeklenebilir bir web ortamı oluşturmak için anlık yapılmayan (asenkron) işlemler çok önemlidir. Bir mesaj kuyruğu imlementasyonu ile, işlemleri sıraya sokup, sırası geldiği zaman işlenmesini sağlayabilirsiniz.
Mesajları bir sıraya sokup işlem yaptırmaya aslında yabancı değiliz; bir çoğumuz kendi "mesaj kuyruğu" implementasyonunu biraz kötü bir şekilde yapmıştır. Örneğin, kullanıcılara gönderilecek e-postaları bir veritabanına(MySQL?) yazıp, , cron job'lar ile belli zaman aralıklarında bu veritabanından bir veriyi çekip, e-postayı gönderdikten sonra bu ilgili kaydı veritabanından silme işlemini yapmışızdır.
Ancak burada bazı sorunlarımız var:
1) Veritabanları bu iş için değil (yaz, belli aralıklarla oku 'select * from queue where completed=0 limit 1', sil)
2) Anlık istek sayınız (concurrency) arttıkça, "işin mesajını"i veritabanına "yazmak", işi tamamlamaktan daha uzun zaman alabiliyor
3) "Ölçekleme"den bashedince, veritabanına "yazma" işlemini ölçeklemeniz, gerçekten çok zor.
4) Üstelik, bu mesaj kuyruğunu implemente etmek, test etmek ve performansını düzeltmek için uzunca bir zaman harcamanız gerekebiliyor.
RabbitMQ bunların hepsini sizin için yapıyor.
Özgür mesaj kuyruğu
Mesaj kuyruğu (veya "enterprise service bus" ) çözümleri, kurumsal pazarda sıkça kullanılan ve pastanın büyük bir payının IBM,Tibco gibi firmaların elinde bulunduğu bir pazardı. 2004 yılında başlayan ve 2006'da olgunlaşan açık standartlı AMQP çözümü ile "mesaj kuyruğu" çözümleri bir standart ile açık hale geldi.
OpenAMQ, Apache QPid ve Red Hat Enterprise MRG gibi kimi kurumsal kimi ise açık çözümler piyasaya çıktı. Bunların arasından RabbitMQ ise, ölçeklenebilirliği, kararlılığı ve basitliği ile ön plana çıktı.
Erlang ile geliştirilen bu uygulama, çoğu Erlang uygılamasında olduğu gibi thread-safe bir yapıya sahip ve kolayca ölçeklenebiliyor.
Bilmeniz gereken bir kaç terim
consumer : Kuyruğu dinleyen uygulama
publisher : Kuyruğa mesaj gönderen uygulama
VirtualHost : VirtualHost'lar, genelde yetki yönetimi için kullanılır, Exchange ve Queue'lar virtualhost'lar içinde tanımlanır
Exchange : Mesajı ilgili "routing key"e göre ilgili queue'ya yönlendiren bölüm
Queue : Mesajların son olarak düştüğü kuyruk
Exchange type : Gelen mesajın, "routing key"e göre hangi queue'ya "nasıl" gönderileceğini belirtir
Kurulum
RabbitMQ kurulumu
RabbitMQ'nun sitesinde Linux ve Windows için hazırlanmış paketler bulunmakta.
Debian testing kullanıcıları
apt-get install rabbitmq-server
komutu ile uygulamayı kurabilirler. Sunucunun 5672 portundan başladığından emin olun :)
İstemci kütüphaneleri kurulumu
- Python kütüphanesi
Linux kullanıcıları aşağıdaki komutlarla py-amqplib kütüphanesini kurabilir
wget -c http://barryp.org/static/software/download/py-amqplib/0.6/amqplib-0.6.tgz
tar -zxvf amqplib-0.6.tgz
cd amqplib-0.6
python setup.py install
- PHP Kütüphanesi
SVN deposundaki kodları çekerek, oluşturacağınız PHP dosyanızı koyacağınız dizinden erişilebilecek bir dizine kopyalayın
svn checkout http://php-amqplib.googlecode.com/svn/trunk/ php-amqplib-read-only
Örnek işlem
Sisteme bir kullanıcı kaydolduğu zaman, onun bulunduğu yere yakın kullanıcılara e-posta gönderen bir uygulama yazacak olalım. Yapmamız gereken işlemler şunlar:
* Veritabanından kaydolan kullanıcının bulunduğu yeri al
* Veritabanından, bu bölgeye yakın diğer kullanıcıları bul
* Bulunan kullanıcılara e-posta gönder
Ve bu işlemin 10 saniye civarında sürdüğünü düşünün. Normalde bu işlemi "anlık" yapmaya kalkarsak, her kayıt olan kullanıcı kayıt sırasında, aslında onu çok da ilgilendirmeyen bir işlem için 10 saniye beklemek zorunda kalacak. Aslında bu tam "kuyruğa gönderilecek" bir iş.
Yazacağımız bir "publisher" ile, çalışmakta olan RabbitMQ sunucumuza "X kullanıcısı kayıt oldu, gerekli işlemler yapılmalı" tarzında bir mesaj göndermemiz ve işlemleri yapacak sorumlu uygulamanın (consumer), sırası gelince bu işi yapmasını sağlayabiliriz. Ve biz mesajı gönderdiğikten sonra kullanıcının bu işlemi beklemesine gerek kalmayacaktır.
Aşağıda PHP ile yazılmış bir "publisher" ve "Python" ile yazılmış bir consumer bu işlemleri yapıyor.
consumer.py
"""
Kullanici kayit islemlerinden gelen mesajlari dinleyen,
mesaj geldiginde ilgili kullanicinin bulundugu yeri veritabanindan cekme,
kullanicinin bulundugu bu yere yakin diger kullanicilari cekme,
ve bulunan diger kullanicilara e-posta gonderme islemlerini
simule eden kod
http://barryp.org/software/py-amqplib/ adresinden py-amqplib'i indirip kurmalisiniz
"""
from amqplib import client_0_8 as amqp
import time # islemlerin surdugu zamanlari simule etmek icin
class rabbitMQMailQueue:
# yapilandirma degiskenleri
def __init__(self):
# rabbitMQ ayarlari
self.rabbitMQConf = {"hostPort" : "localhost:5672",
"user" : "guest",
"pass" : "guest",
"virtualHost" : "/"
}
# kuyrugun ayarlari
self.queueConf = {"name" : "eposta",
"exchange" : "kayit"
}
# kuyrugu dinle ve gelen her mesajda doJobs methodunu calistir
def listen(self):
try:
# amqp'ye baglan
conn = amqp.Connection(self.rabbitMQConf['hostPort'],
self.rabbitMQConf['user'],
self.rabbitMQConf['pass'],
virtualHost = self.rabbitMQConf['virtualHost'],
insist = False)
chan = conn.channel()
chan.queue_declare( self.queueConf['name'],
durable=True, # kuyruk yeniden basladiginda yeniden yaratilsin
exclusive=False,
auto_delete=False) # son islemden sonra kuyrugu silme
chan.exchange_declare(self.queueConf['exchange'],
type="direct", # direkt olarak ilgili kuyruga gonder
durable=True,
auto_delete=False,)
# queue ile exchange'i bagla
chan.queue_bind(self.queueConf['name'], self.queueConf['exchange'])
# islemden sonra cagirilacak methodu ayarla
chan.basic_consume(self.queueConf['name'],
no_ack=True,
callback=self.doJobs) # ilgili methodu calistir
# kuyrugu dinlemeye basla
while True:
chan.wait()
chan.close()
conn.close()
except Exception, error :
print "Bir hata olustu : ",error
# kuyruktan her mesaj geldiginde ilgili kullaniciya ait asagidaki islemleri yap
def doJobs(self,msg):
print "Kuyruktan bir mesaj alindi"
userId = msg.body
print userId, " id'li kullanicinin bolgesi bulunuyor"
time.sleep(1) # islemin 1 saniye surdugunu varsayalim
print userId, " id'li kullanicinin bulundugu yere yakin kullanicilar bulunuyor"
time.sleep(6) # islemin 6 saniye surdugunu varsayalim
print "Bulunan kullanicilara e-posta gonderiliyor"
time.sleep(3) # e-posta gonderme isleminin 3 saniye surdugunu varsayalim
# toplamda islemler 10 saniye suruyor
# bu islem bittikten sonra kuyruktan yeni mesaj aliniyor
# uygulamayi baslat
if __name__ == '__main__':
mailQueue = rabbitMQMailQueue()
mailQueue.listen()
publisher.php
/**
* Kullanici kayit olduktan sonra kuyruga kullanici id'sini
* mesaj gonderen uygulama
*
* http://code.google.com/p/php-amqplib/ adresinden PHP kutuphanesini indirmelisiniz
*/
//indirdiginiz kutuphane icindeki dosyanin yolunu gosterin
require_once('../amqp.inc');
class sendRegisterMessageToQueue {
// yapilandirma
function __construct() {
// rabbitmq ayarlari
$this->rabbitMQConf = array("host"=>"localhost",
"port"=>"5672",
"user"=>"guest",
"pass"=>"guest",
"virtualHost"=>"/");
// kuyruk ayarlari
$this->queueConf = array("exchange"=>"kayit");
}
// kullanici id'sini mesaj olarak gonder
function send($userId) {
try{
// amqp'ye baglan
$conn = new AMQPConnection($this->rabbitMQConf['host'],
$this->rabbitMQConf['port'],
$this->rabbitMQConf['user'],
$this->rabbitMQConf['pass']);
$channel = $conn->channel();
$channel->access_request($this->rabbitMQConf['virtualHost'], false, false, true, true);
// mesajı gönder
$msg = new AMQPMessage($userId, array('content_type' => 'text/plain'));
$channel->basic_publish($msg, $this->queueConf['exchange']);
$channel->close();
$conn->close();
return true;
}
catch(Exception $e) {
echo "Bir hata oluştu ".$e->getMessage();
}
}
}
$messageSender = new sendRegisterMessageToQueue();
$userId = 777;
if ( $messageSender->send($userId ) ) {
echo "Kullanicinin yakinindaki insanlara e-posta gonderme mesaji iletildi";
}
Kaynaklar:
http://www.rabbitmq.com/http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/