برای بسیاری از عملکردهای کاربردی حیاتی، از جمله جریان و تجارت الکترونیک، معماری یکپارچه دیگر کافی نیست. با تقاضاهای کنونی برای دادههای رویداد بلادرنگ و استفاده از سرویس ابری، بسیاری از برنامههای کاربردی مدرن، مانند Netflix و Lyft، به رویکرد میکروسرویسهای رویداد محور تغییر دادهاند. میکروسرویسهای مجزا میتوانند مستقل از یکدیگر عمل کنند و سازگاری و مقیاسپذیری پایه کد را افزایش دهند.
اما معماری میکروسرویس های رویداد محور چیست و چرا باید از آن استفاده کنید؟ ما جنبههای اساسی را بررسی میکنیم و یک طرح کامل برای پروژه میکروسرویسهای رویداد محور با استفاده از پایتون و آپاچی کافکا ایجاد میکنیم.
استفاده از میکروسرویس های رویداد محور
میکروسرویسهای رویداد محور دو الگوی معماری مدرن را ترکیب میکنند: معماریهای میکروسرویس و معماریهای رویداد محور. اگرچه ریزسرویسها میتوانند با معماریهای REST مبتنی بر درخواست جفت شوند، معماریهای رویداد محور با افزایش دادههای بزرگ و محیطهای پلتفرم ابری مرتبط میشوند.
معماری میکروسرویس چیست؟
معماری میکروسرویس یک تکنیک توسعه نرمافزار است که فرآیندهای یک برنامه کاربردی را بهعنوان سرویسهایی که با هم جفت شدهاند سازماندهی میکند. این یک نوع معماری سرویس گرا (SOA) است.
در یک ساختار یکپارچه سنتی، تمام فرآیندهای کاربردی ذاتاً به هم مرتبط هستند. اگر یکی از قطعات خراب شود، سیستم از کار می افتد. معماریهای میکروسرویسها در عوض فرآیندهای برنامه را در سرویسهای جداگانه گروهبندی میکنند که با پروتکلهای سبک در تعامل هستند، و ماژولار بودن بهبود یافته و قابلیت نگهداری و انعطافپذیری بهتر برنامه را ارائه میدهند.
اگرچه برنامههای یکپارچه ممکن است برای توسعه، اشکالزدایی، آزمایش و استقرار سادهتر باشند، اکثر برنامههای کاربردی در سطح سازمانی بهعنوان استاندارد خود به میکروسرویسها روی میآورند که به توسعهدهندگان اجازه میدهد تا اجزای مستقل خود را داشته باشند. میکروسرویسهای موفق باید تا حد امکان ساده باشند و با استفاده از پیامهایی (رویدادها) که تولید و به جریان رویداد ارسال میشوند یا از یک جریان رویداد مصرف میشوند، ارتباط برقرار کنند. JSON، Apache Avro و بافرهای پروتکل گوگل انتخاب های رایج برای سریال سازی داده ها هستند.
معماری رویداد محور چیست؟
معماری رویداد محور یک الگوی طراحی است که نرم افزار را طوری ساختار می دهد که رویدادها رفتار یک برنامه کاربردی را هدایت کنند. رویدادها داده های معناداری هستند که توسط بازیگران (یعنی کاربران انسانی، برنامه های کاربردی خارجی یا سایر خدمات).
پروژه نمونه ما این معماری را دارد. در هسته آن یک پلت فرم جریان رویداد است که ارتباطات را به دو روش مدیریت می کند:
- دریافت پیام ها از بازیگرانی که آنها را می نویسند (معمولاً ناشر یا تهیه کننده نامیده می شوند)
- ارسال پیام به سایر بازیگرانی که آنها را می خوانند (معمولاً مشترکین یا مصرف کنندگان نامیده می شوند)
به عبارت فنی تر، پلت فرم جریان رویداد ما نرم افزاری است که به عنوان لایه ارتباطی بین سرویس ها عمل می کند و به آنها امکان تبادل پیام را می دهد. می تواند الگوهای پیام رسانی مختلفی مانند انتشار/اشتراک و یا پیام نقطه به نقطه، همچنین صف های پیام.
استفاده از معماری رویداد محور با یک پلت فرم جریان رویداد و ریزسرویس ها مزایای زیادی را ارائه می دهد:
- ارتباطات ناهمزمان: قابلیت چندوظیفه مستقل به سرویسها اجازه میدهد تا هر زمان که آماده هستند به جای اینکه منتظر پایان کار قبلی قبل از شروع کار بعدی باشند، به رویدادها واکنش نشان دهند. ارتباطات ناهمزمان پردازش داده ها را در زمان واقعی تسهیل می کند و برنامه ها را واکنش پذیرتر و قابل نگهداری تر می کند.
- جداسازی کامل و انعطاف پذیری: جداسازی اجزای تولید کننده و مصرف کننده به این معنی است که خدمات فقط باید با پلتفرم جریان رویداد و قالب داده ای که می توانند تولید یا مصرف کنند تعامل داشته باشند. خدمات می توانند از اصل مسئولیت واحد و مقیاس مستقل پیروی کنند. آنها حتی می توانند توسط تیم های توسعه جداگانه با استفاده از پشته های فناوری منحصر به فرد پیاده سازی شوند.
- قابلیت اطمینان و مقیاس پذیری: ماهیت ناهمزمان و جداشده معماریهای رویداد محور، قابلیت اطمینان و مقیاسپذیری برنامه را بیشتر تقویت میکند (که قبلاً از مزایای طراحی معماری میکروسرویسها هستند).
با معماری های رویداد محور، ایجاد سرویس هایی که به هر رویداد سیستمی واکنش نشان می دهند، آسان است. شما همچنین می توانید خطوط لوله نیمه اتوماتیک ایجاد کنید که شامل برخی اقدامات دستی است. (به عنوان مثال، یک خط لوله برای پرداخت های خودکار کاربر ممکن است شامل یک بررسی امنیتی دستی باشد که توسط مقادیر پرداخت غیرعادی بزرگ قبل از انتقال وجه انجام می شود.)
انتخاب پشته فناوری پروژه
ما پروژه خود را با استفاده از Python و Apache Kafka جفت شده با Confluent Cloud ایجاد خواهیم کرد. پایتون یک استاندارد قوی و قابل اعتماد برای بسیاری از انواع پروژه های نرم افزاری است. دارای یک جامعه بزرگ و کتابخانه های فراوان است. این یک انتخاب خوب برای ایجاد میکروسرویس است زیرا چارچوب های آن برای REST و برنامه های رویداد محور (مانند Flask و Django) مناسب است. میکروسرویس های نوشته شده در پایتون نیز معمولاً با آپاچی کافکا استفاده می شوند.
آپاچی کافکا یک پلتفرم پخش جریانی رویداد شناخته شده است که از الگوی پیام رسانی انتشار/اشتراک استفاده می کند. به دلیل اکوسیستم گسترده، مقیاس پذیری (نتیجه توانایی های تحمل خطا)، سیستم ذخیره سازی و توانایی های پردازش جریان، انتخابی رایج برای معماری های رویداد محور است.
در نهایت، ما از Confluent به عنوان پلتفرم ابری خود برای مدیریت موثر کافکا و ارائه زیرساخت های خارج از جعبه استفاده خواهیم کرد. اگر از زیرساخت AWS استفاده میکنید، AWS MSK گزینه عالی دیگری است، اما راهاندازی Confluent آسانتر است زیرا Kafka بخش اصلی سیستم آن است و یک لایه رایگان ارائه میدهد.
اجرای طرح پروژه
ما نمونه میکروسرویس های کافکا خود را در Confluent Cloud راه اندازی می کنیم، یک تولید کننده پیام ساده ایجاد می کنیم، سپس آن را سازماندهی و بهبود می دهیم تا مقیاس پذیری را بهینه کنیم. در پایان این آموزش، ما یک تولید کننده پیام کارآمد خواهیم داشت که با موفقیت داده ها را به خوشه ابری ما ارسال می کند.
راه اندازی کافکا
ابتدا یک خوشه کافکا ایجاد می کنیم. خوشه های کافکا میزبان سرورهای کافکا هستند که ارتباطات را تسهیل می کنند. تولیدکنندگان و مصرفکنندگان با استفاده از موضوعات کافکا (دستههایی که رکوردها را ذخیره میکنند) با سرورها ارتباط برقرار میکنند.
- ثبت نام برای ابر متقابل. هنگامی که یک حساب کاربری ایجاد می کنید، صفحه خوش آمدگویی با گزینه هایی برای ایجاد یک خوشه جدید کافکا ظاهر می شود. انتخاب کنید پایه ای پیکربندی
- ارائه دهنده و منطقه ابری را انتخاب کنید. شما باید انتخاب های خود را برای بهترین نتایج پینگ ابری از موقعیت مکانی خود بهینه کنید. یکی از گزینه ها انتخاب است AWS و انجام یک تست پینگ ابری (کلیک پینگ HTTP) برای شناسایی بهترین منطقه. (برای محدوده آموزش ما، گزینه “Single zone” را در قسمت “Availability” انتخاب می کنیم.)
- صفحه بعدی یک تنظیم پرداخت را می خواهد که از آنجایی که در یک ردیف رایگان هستیم، می توانیم از آن صرف نظر کنیم. پس از آن، نام خوشه خود را وارد می کنیم (به عنوان مثال، MyFirstKafkaCluster)، تنظیمات خود را تأیید می کنیم و انتخاب می کنیم راه اندازی خوشه.
با یک خوشه کاری، ما آماده هستیم تا اولین موضوع خود را ایجاد کنیم. در نوار منوی سمت چپ، به مسیر بروید موضوعات و کلیک کنید موضوع ایجاد کنید. یک نام موضوع اضافه کنید (به عنوان مثال، “MyFirstKafkaTopic”) و با تنظیمات پیش فرض (از جمله تنظیم شش پارتیشن) ادامه دهید.
قبل از ایجاد اولین پیام خود، باید مشتری خود را تنظیم کنیم. ما به راحتی می توانیم یک مشتری را پیکربندی کنید از نمای کلی موضوع تازه ایجاد شده ما (به طور متناوب، در نوار منوی سمت چپ، به مشتریان). استفاده خواهیم کرد پایتون به عنوان زبان ما و سپس کلیک کنید کلید API خوشه کافکا را ایجاد کنید.
در این مرحله، پلتفرم جریان رویداد ما در نهایت آماده دریافت پیامهای تولیدکننده ما است.
تولید کننده پیام ساده
تهیه کننده ما رویدادها را تولید می کند و آنها را برای کافکا می فرستد. بیایید یک کد برای ایجاد یک تولید کننده پیام ساده بنویسیم. من توصیه می کنم راه اندازی یک محیط مجازی برای پروژه ما، زیرا ما چندین بسته را در محیط خود نصب خواهیم کرد.
ابتدا، متغیرهای محیطی خود را از پیکربندی API از Confluent Cloud اضافه می کنیم. برای انجام این کار در محیط مجازی خود، اضافه می کنیم export SETTING=value
برای هر تنظیم زیر تا پایان ما activate
فایل (به طور متناوب، می توانید اضافه کنید SETTING=value
به فایل env شما):
export KAFKA_BOOTSTRAP_SERVERS=<bootstrap.servers>
export KAFKA_SECURITY_PROTOCOL=<security.protocol>
export KAFKA_SASL_MECHANISMS=<sasl.mechanisms>
export KAFKA_SASL_USERNAME=<sasl.username>
export KAFKA_SASL_PASSWORD=<sasl.password>
مطمئن شوید که هر ورودی را با مقادیر Confluent Cloud خود جایگزین کنید (به عنوان مثال، <sasl.mechanisms>
باید باشد PLAIN
)، با کلید و رمز API شما به عنوان نام کاربری و رمز عبور. اجرا کن source env/bin/activate
، سپس printenv
. تنظیمات جدید ما باید ظاهر شود و تأیید کند که متغیرهای ما به درستی به روز شده اند.
ما از دو بسته پایتون استفاده خواهیم کرد:
ما دستور را اجرا می کنیم pip install confluent-kafka python-dotenv
برای نصب اینها بسیاری دیگر وجود دارد بسته هایی برای کافکا در پایتون که ممکن است برای توسعه پروژه خود مفید باشد.
در نهایت، ما تولید کننده اصلی خود را با استفاده از تنظیمات کافکا ایجاد می کنیم. a اضافه کنید simple_producer.py
فایل:
# simple_producer.py
import os
from confluent_kafka import KafkaException, Producer
from dotenv import load_dotenv
def main():
settings = {
'bootstrap.servers': os.getenv('KAFKA_BOOTSTRAP_SERVERS'),
'security.protocol': os.getenv('KAFKA_SECURITY_PROTOCOL'),
'sasl.mechanisms': os.getenv('KAFKA_SASL_MECHANISMS'),
'sasl.username': os.getenv('KAFKA_SASL_USERNAME'),
'sasl.password': os.getenv('KAFKA_SASL_PASSWORD'),
}
producer = Producer(settings)
producer.produce(
topic='MyFirstKafkaTopic',
key=None,
value='MyFirstValue-111',
)
producer.flush() # Wait for the confirmation that the message was received
if __name__ == '__main__':
load_dotenv()
main()
با این کد ساده، ما تولید کننده خود را ایجاد می کنیم و یک پیام آزمایشی ساده برای آن ارسال می کنیم. برای آزمایش نتیجه، اجرا کنید python3 simple_producer.py
:
در حال بررسی خوشه کافکا ما نمای کلی کلاستر > داشبورد، یک نقطه داده جدید را در نمودار تولید خود برای پیام ارسال شده مشاهده خواهیم کرد.
تولید کننده پیام سفارشی
تولید کننده ما فعال است. بیایید کد خود را دوباره سازماندهی کنیم تا پروژه ما مدولارتر شود و OOP پسند. این کار اضافه کردن خدمات و مقیاس پروژه ما را در آینده آسان تر می کند. ما کد خود را به چهار فایل تقسیم می کنیم:
-
kafka_settings.py
: پیکربندی های کافکا ما را نگه می دارد. -
kafka_producer.py
: شامل یک عرف استproduce()
روش و رسیدگی به خطا -
kafka_producer_message.py
: انواع مختلف داده ورودی را کنترل می کند. -
advanced_producer.py
: برنامه نهایی ما را با استفاده از کلاس های سفارشی ما اجرا می کند.
اول، ما KafkaSettings
class تنظیمات آپاچی کافکا ما را کپسوله می کند، بنابراین ما می توانیم به راحتی از فایل های دیگر خود بدون تکرار کد به آنها دسترسی داشته باشیم:
# kafka_settings.py
import os
class KafkaSettings:
def __init__(self):
self.conf = {
'bootstrap.servers': os.getenv('KAFKA_BOOTSTRAP_SERVERS'),
'security.protocol': os.getenv('KAFKA_SECURITY_PROTOCOL'),
'sasl.mechanisms': os.getenv('KAFKA_SASL_MECHANISMS'),
'sasl.username': os.getenv('KAFKA_SASL_USERNAME'),
'sasl.password': os.getenv('KAFKA_SASL_PASSWORD'),
}
بعد، ما KafkaProducer
به ما اجازه می دهد تا خود را سفارشی کنیم produce()
روش با پشتیبانی از خطاهای مختلف (به عنوان مثال، خطا در زمانی که اندازه پیام خیلی بزرگ است)، و همچنین به طور خودکار پیام ها را شستشو می دهد یک بار تولید شد:
# kafka_producer.py
from confluent_kafka import KafkaError, KafkaException, Producer
from kafka_producer_message import ProducerMessage
from kafka_settings import KafkaSettings
class KafkaProducer:
def __init__(self, settings: KafkaSettings):
self._producer = Producer(settings.conf)
def produce(self, message: ProducerMessage):
try:
self._producer.produce(message.topic, key=message.key, value=message.value)
self._producer.flush()
except KafkaException as exc:
if exc.args[0].code() == KafkaError.MSG_SIZE_TOO_LARGE:
pass # Handle the error here
else:
raise exc
در بلوک try-except مثال ما، اگر پیام آنقدر بزرگ باشد که خوشه کافکا نمی تواند آن را مصرف کند، از روی آن می گذریم. با این حال، شما باید کد خود را در مرحله تولید به روز کنید تا این خطا به درستی مدیریت شود. رجوع به confluent-kafka
مستندات برای لیست کامل کدهای خطا
حالا ما ProducerMessage
کلاس انواع مختلفی از داده های ورودی را مدیریت می کند و آنها را به درستی سریال می کند. ما قابلیتهایی را برای واژهنامهها، رشتههای یونیکد و رشتههای بایت اضافه میکنیم:
# kafka_producer_message.py
import json
class ProducerMessage:
def __init__(self, topic: str, value, key=None) -> None:
self.topic = f'{topic}'
self.key = key
self.value = self.convert_value_to_bytes(value)
@classmethod
def convert_value_to_bytes(cls, value):
if isinstance(value, dict):
return cls.from_json(value)
if isinstance(value, str):
return cls.from_string(value)
if isinstance(value, bytes):
return cls.from_bytes(value)
raise ValueError(f'Wrong message value type: {type(value)}')
@classmethod
def from_json(cls, value):
return json.dumps(value, indent=None, sort_keys=True, default=str, ensure_ascii=False)
@classmethod
def from_string(cls, value):
return value.encode('utf-8')
@classmethod
def from_bytes(cls, value):
return value
در نهایت، ما میتوانیم برنامه خود را با استفاده از کلاسهای جدید ایجاد شده در آن بسازیم advanced_producer.py
:
# advanced_producer.py
from dotenv import load_dotenv
from kafka_producer import KafkaProducer
from kafka_producer_message import ProducerMessage
from kafka_settings import KafkaSettings
def main():
settings = KafkaSettings()
producer = KafkaProducer(settings)
message = ProducerMessage(
topic='MyFirstKafkaTopic',
value={"value": "MyFirstKafkaValue"},
key=None,
)
producer.produce(message)
if __name__ == '__main__':
load_dotenv()
main()
ما اکنون یک انتزاع منظم در بالای صفحه داریم confluent-kafka
کتابخانه تولید کننده سفارشی ما دارای عملکردی مشابه تولید کننده ساده ما با مقیاس پذیری و انعطاف پذیری بیشتر است و آماده سازگاری با نیازهای مختلف است. حتی اگر بخواهیم میتوانیم کتابخانه زیربنایی را کاملاً تغییر دهیم، که پروژه ما را برای موفقیت و قابلیت نگهداری طولانی مدت تنظیم میکند.
بعد از دویدن python3 advanced_producer.py
، دوباره می بینیم که داده ها به خوشه ما در قسمت ارسال شده است نمای کلی کلاستر > داشبورد پانل ابر متجانس. پس از ارسال یک پیام با تولید کننده ساده و پیام دوم با تولید کننده سفارشی ما، اکنون شاهد دو جهش در توان تولید و افزایش ذخیره سازی کلی استفاده شده هستیم.
نگاه به آینده: از تولیدکننده تا مصرف کننده
یک معماری میکروسرویس مبتنی بر رویداد، پروژه شما را بهبود می بخشد و مقیاس پذیری، انعطاف پذیری، قابلیت اطمینان و ارتباطات ناهمزمان آن را بهبود می بخشد. این آموزش نگاهی اجمالی از این مزایا در عمل به شما ارائه کرده است. با راهاندازی و راهاندازی تولیدکننده در مقیاس سازمانی، و ارسال موفقیتآمیز پیامها به کارگزار کافکا، گامهای بعدی ایجاد مشتری برای خواندن این پیامها از سرویسهای دیگر و افزودن Docker به برنامه ما خواهد بود.
تیم تحریریه وبلاگ مهندسی تاپتال از شما تشکر می کند E. Deniz Toktay برای بررسی نمونه کدها و سایر مطالب فنی ارائه شده در این مقاله.