میکروسرویس های رویداد محور را با کافکا و پایتون ساده کنید


برای بسیاری از عملکردهای کاربردی حیاتی، از جمله جریان و تجارت الکترونیک، معماری یکپارچه دیگر کافی نیست. با تقاضاهای کنونی برای داده‌های رویداد بلادرنگ و استفاده از سرویس ابری، بسیاری از برنامه‌های کاربردی مدرن، مانند Netflix و Lyft، به رویکرد میکروسرویس‌های رویداد محور تغییر داده‌اند. میکروسرویس‌های مجزا می‌توانند مستقل از یکدیگر عمل کنند و سازگاری و مقیاس‌پذیری پایه کد را افزایش دهند.

اما معماری میکروسرویس های رویداد محور چیست و چرا باید از آن استفاده کنید؟ ما جنبه‌های اساسی را بررسی می‌کنیم و یک طرح کامل برای پروژه میکروسرویس‌های رویداد محور با استفاده از پایتون و آپاچی کافکا ایجاد می‌کنیم.

استفاده از میکروسرویس های رویداد محور

میکروسرویس‌های رویداد محور دو الگوی معماری مدرن را ترکیب می‌کنند: معماری‌های میکروسرویس و معماری‌های رویداد محور. اگرچه ریزسرویس‌ها می‌توانند با معماری‌های REST مبتنی بر درخواست جفت شوند، معماری‌های رویداد محور با افزایش داده‌های بزرگ و محیط‌های پلتفرم ابری مرتبط می‌شوند.

معماری میکروسرویس چیست؟

معماری میکروسرویس یک تکنیک توسعه نرم‌افزار است که فرآیندهای یک برنامه کاربردی را به‌عنوان سرویس‌هایی که با هم جفت شده‌اند سازماندهی می‌کند. این یک نوع معماری سرویس گرا (SOA) است.

در یک ساختار یکپارچه سنتی، تمام فرآیندهای کاربردی ذاتاً به هم مرتبط هستند. اگر یکی از قطعات خراب شود، سیستم از کار می افتد. معماری‌های میکروسرویس‌ها در عوض فرآیندهای برنامه را در سرویس‌های جداگانه گروه‌بندی می‌کنند که با پروتکل‌های سبک در تعامل هستند، و ماژولار بودن بهبود یافته و قابلیت نگهداری و انعطاف‌پذیری بهتر برنامه را ارائه می‌دهند.

معماری میکروسرویس ها (با رابط کاربری جداگانه به میکروسرویس های جداگانه متصل شده است) در مقابل معماری یکپارچه (با منطق و رابط کاربری متصل).
معماری میکروسرویس ها در مقابل معماری یکپارچه

اگرچه برنامه‌های یکپارچه ممکن است برای توسعه، اشکال‌زدایی، آزمایش و استقرار ساده‌تر باشند، اکثر برنامه‌های کاربردی در سطح سازمانی به‌عنوان استاندارد خود به میکروسرویس‌ها روی می‌آورند که به توسعه‌دهندگان اجازه می‌دهد تا اجزای مستقل خود را داشته باشند. میکروسرویس‌های موفق باید تا حد امکان ساده باشند و با استفاده از پیام‌هایی (رویدادها) که تولید و به جریان رویداد ارسال می‌شوند یا از یک جریان رویداد مصرف می‌شوند، ارتباط برقرار کنند. JSON، Apache Avro و بافرهای پروتکل گوگل انتخاب های رایج برای سریال سازی داده ها هستند.

معماری رویداد محور چیست؟

معماری رویداد محور یک الگوی طراحی است که نرم افزار را طوری ساختار می دهد که رویدادها رفتار یک برنامه کاربردی را هدایت کنند. رویدادها داده های معناداری هستند که توسط بازیگران (یعنی کاربران انسانی، برنامه های کاربردی خارجی یا سایر خدمات).

پروژه نمونه ما این معماری را دارد. در هسته آن یک پلت فرم جریان رویداد است که ارتباطات را به دو روش مدیریت می کند:

  • دریافت پیام ها از بازیگرانی که آنها را می نویسند (معمولاً ناشر یا تهیه کننده نامیده می شوند)
  • ارسال پیام به سایر بازیگرانی که آنها را می خوانند (معمولاً مشترکین یا مصرف کنندگان نامیده می شوند)

به عبارت فنی تر، پلت فرم جریان رویداد ما نرم افزاری است که به عنوان لایه ارتباطی بین سرویس ها عمل می کند و به آنها امکان تبادل پیام را می دهد. می تواند الگوهای پیام رسانی مختلفی مانند انتشار/اشتراک و یا پیام نقطه به نقطه، همچنین صف های پیام.

تولیدکننده ای که پیامی را به یک پلتفرم پخش رویداد ارسال می کند، که پیام را به یکی از سه مصرف کننده می فرستد.
معماری رویداد محور

استفاده از معماری رویداد محور با یک پلت فرم جریان رویداد و ریزسرویس ها مزایای زیادی را ارائه می دهد:

  • ارتباطات ناهمزمان: قابلیت چندوظیفه مستقل به سرویس‌ها اجازه می‌دهد تا هر زمان که آماده هستند به جای اینکه منتظر پایان کار قبلی قبل از شروع کار بعدی باشند، به رویدادها واکنش نشان دهند. ارتباطات ناهمزمان پردازش داده ها را در زمان واقعی تسهیل می کند و برنامه ها را واکنش پذیرتر و قابل نگهداری تر می کند.
  • جداسازی کامل و انعطاف پذیری: جداسازی اجزای تولید کننده و مصرف کننده به این معنی است که خدمات فقط باید با پلتفرم جریان رویداد و قالب داده ای که می توانند تولید یا مصرف کنند تعامل داشته باشند. خدمات می توانند از اصل مسئولیت واحد و مقیاس مستقل پیروی کنند. آنها حتی می توانند توسط تیم های توسعه جداگانه با استفاده از پشته های فناوری منحصر به فرد پیاده سازی شوند.
  • قابلیت اطمینان و مقیاس پذیری: ماهیت ناهمزمان و جداشده معماری‌های رویداد محور، قابلیت اطمینان و مقیاس‌پذیری برنامه را بیشتر تقویت می‌کند (که قبلاً از مزایای طراحی معماری میکروسرویس‌ها هستند).

با معماری های رویداد محور، ایجاد سرویس هایی که به هر رویداد سیستمی واکنش نشان می دهند، آسان است. شما همچنین می توانید خطوط لوله نیمه اتوماتیک ایجاد کنید که شامل برخی اقدامات دستی است. (به عنوان مثال، یک خط لوله برای پرداخت های خودکار کاربر ممکن است شامل یک بررسی امنیتی دستی باشد که توسط مقادیر پرداخت غیرعادی بزرگ قبل از انتقال وجه انجام می شود.)

انتخاب پشته فناوری پروژه

ما پروژه خود را با استفاده از Python و Apache Kafka جفت شده با Confluent Cloud ایجاد خواهیم کرد. پایتون یک استاندارد قوی و قابل اعتماد برای بسیاری از انواع پروژه های نرم افزاری است. دارای یک جامعه بزرگ و کتابخانه های فراوان است. این یک انتخاب خوب برای ایجاد میکروسرویس است زیرا چارچوب های آن برای REST و برنامه های رویداد محور (مانند Flask و Django) مناسب است. میکروسرویس های نوشته شده در پایتون نیز معمولاً با آپاچی کافکا استفاده می شوند.

آپاچی کافکا یک پلتفرم پخش جریانی رویداد شناخته شده است که از الگوی پیام رسانی انتشار/اشتراک استفاده می کند. به دلیل اکوسیستم گسترده، مقیاس پذیری (نتیجه توانایی های تحمل خطا)، سیستم ذخیره سازی و توانایی های پردازش جریان، انتخابی رایج برای معماری های رویداد محور است.

در نهایت، ما از Confluent به عنوان پلتفرم ابری خود برای مدیریت موثر کافکا و ارائه زیرساخت های خارج از جعبه استفاده خواهیم کرد. اگر از زیرساخت AWS استفاده می‌کنید، AWS MSK گزینه عالی دیگری است، اما راه‌اندازی Confluent آسان‌تر است زیرا Kafka بخش اصلی سیستم آن است و یک لایه رایگان ارائه می‌دهد.

اجرای طرح پروژه

ما نمونه میکروسرویس های کافکا خود را در Confluent Cloud راه اندازی می کنیم، یک تولید کننده پیام ساده ایجاد می کنیم، سپس آن را سازماندهی و بهبود می دهیم تا مقیاس پذیری را بهینه کنیم. در پایان این آموزش، ما یک تولید کننده پیام کارآمد خواهیم داشت که با موفقیت داده ها را به خوشه ابری ما ارسال می کند.

راه اندازی کافکا

ابتدا یک خوشه کافکا ایجاد می کنیم. خوشه های کافکا میزبان سرورهای کافکا هستند که ارتباطات را تسهیل می کنند. تولیدکنندگان و مصرف‌کنندگان با استفاده از موضوعات کافکا (دسته‌هایی که رکوردها را ذخیره می‌کنند) با سرورها ارتباط برقرار می‌کنند.

  1. ثبت نام برای ابر متقابل. هنگامی که یک حساب کاربری ایجاد می کنید، صفحه خوش آمدگویی با گزینه هایی برای ایجاد یک خوشه جدید کافکا ظاهر می شود. انتخاب کنید پایه ای پیکربندی
  2. ارائه دهنده و منطقه ابری را انتخاب کنید. شما باید انتخاب های خود را برای بهترین نتایج پینگ ابری از موقعیت مکانی خود بهینه کنید. یکی از گزینه ها انتخاب است AWS و انجام یک تست پینگ ابری (کلیک پینگ HTTP) برای شناسایی بهترین منطقه. (برای محدوده آموزش ما، گزینه “Single zone” را در قسمت “Availability” انتخاب می کنیم.)
  3. صفحه بعدی یک تنظیم پرداخت را می خواهد که از آنجایی که در یک ردیف رایگان هستیم، می توانیم از آن صرف نظر کنیم. پس از آن، نام خوشه خود را وارد می کنیم (به عنوان مثال، MyFirstKafkaCluster)، تنظیمات خود را تأیید می کنیم و انتخاب می کنیم راه اندازی خوشه.
صفحه نمایش
پیکربندی خوشه کافکا

با یک خوشه کاری، ما آماده هستیم تا اولین موضوع خود را ایجاد کنیم. در نوار منوی سمت چپ، به مسیر بروید موضوعات و کلیک کنید موضوع ایجاد کنید. یک نام موضوع اضافه کنید (به عنوان مثال، “MyFirstKafkaTopic”) و با تنظیمات پیش فرض (از جمله تنظیم شش پارتیشن) ادامه دهید.

قبل از ایجاد اولین پیام خود، باید مشتری خود را تنظیم کنیم. ما به راحتی می توانیم یک مشتری را پیکربندی کنید از نمای کلی موضوع تازه ایجاد شده ما (به طور متناوب، در نوار منوی سمت چپ، به مشتریان). استفاده خواهیم کرد پایتون به عنوان زبان ما و سپس کلیک کنید کلید API خوشه کافکا را ایجاد کنید.

صفحه Confluent Clients مرحله 2 (پیکربندی کد مشتری) را با تنظیم کلید API خوشه Kafka و قطعه کد پیکربندی نشان می دهد.
راه اندازی کلید Kafka Cluster API

در این مرحله، پلتفرم جریان رویداد ما در نهایت آماده دریافت پیام‌های تولیدکننده ما است.

تولید کننده پیام ساده

تهیه کننده ما رویدادها را تولید می کند و آنها را برای کافکا می فرستد. بیایید یک کد برای ایجاد یک تولید کننده پیام ساده بنویسیم. من توصیه می کنم راه اندازی یک محیط مجازی برای پروژه ما، زیرا ما چندین بسته را در محیط خود نصب خواهیم کرد.

ابتدا، متغیرهای محیطی خود را از پیکربندی API از Confluent Cloud اضافه می کنیم. برای انجام این کار در محیط مجازی خود، اضافه می کنیم export SETTING=value برای هر تنظیم زیر تا پایان ما activate فایل (به طور متناوب، می توانید اضافه کنید SETTING=value به فایل env شما):

export KAFKA_BOOTSTRAP_SERVERS=<bootstrap.servers>
export KAFKA_SECURITY_PROTOCOL=<security.protocol>
export KAFKA_SASL_MECHANISMS=<sasl.mechanisms>
export KAFKA_SASL_USERNAME=<sasl.username>
export KAFKA_SASL_PASSWORD=<sasl.password>

مطمئن شوید که هر ورودی را با مقادیر Confluent Cloud خود جایگزین کنید (به عنوان مثال، <sasl.mechanisms> باید باشد PLAIN)، با کلید و رمز API شما به عنوان نام کاربری و رمز عبور. اجرا کن source env/bin/activate، سپس printenv. تنظیمات جدید ما باید ظاهر شود و تأیید کند که متغیرهای ما به درستی به روز شده اند.

ما از دو بسته پایتون استفاده خواهیم کرد:

ما دستور را اجرا می کنیم pip install confluent-kafka python-dotenv برای نصب اینها بسیاری دیگر وجود دارد بسته هایی برای کافکا در پایتون که ممکن است برای توسعه پروژه خود مفید باشد.

در نهایت، ما تولید کننده اصلی خود را با استفاده از تنظیمات کافکا ایجاد می کنیم. a اضافه کنید simple_producer.py فایل:

# simple_producer.py
import os

from confluent_kafka import KafkaException, Producer
from dotenv import load_dotenv

def main():
    settings = {
        'bootstrap.servers': os.getenv('KAFKA_BOOTSTRAP_SERVERS'),
        'security.protocol': os.getenv('KAFKA_SECURITY_PROTOCOL'),
        'sasl.mechanisms': os.getenv('KAFKA_SASL_MECHANISMS'),
        'sasl.username': os.getenv('KAFKA_SASL_USERNAME'),
        'sasl.password': os.getenv('KAFKA_SASL_PASSWORD'),
    }

    producer = Producer(settings)
    producer.produce(
        topic='MyFirstKafkaTopic',
                      key=None,
                      value='MyFirstValue-111',
    )
    producer.flush()  # Wait for the confirmation that the message was received

if __name__ == '__main__':
    load_dotenv()
    main()

با این کد ساده، ما تولید کننده خود را ایجاد می کنیم و یک پیام آزمایشی ساده برای آن ارسال می کنیم. برای آزمایش نتیجه، اجرا کنید python3 simple_producer.py:

داشبورد نمای کلی کلاستر Confluent، با یک اسپک در نمودارهای تولید (بایت/ثانیه) و ذخیره‌سازی ظاهر می‌شود و هیچ داده‌ای برای مصرف نشان داده نمی‌شود.
خروجی و ذخیره سازی اولین پیام آزمایشی

در حال بررسی خوشه کافکا ما نمای کلی کلاستر > داشبورد، یک نقطه داده جدید را در نمودار تولید خود برای پیام ارسال شده مشاهده خواهیم کرد.

تولید کننده پیام سفارشی

تولید کننده ما فعال است. بیایید کد خود را دوباره سازماندهی کنیم تا پروژه ما مدولارتر شود و OOP پسند. این کار اضافه کردن خدمات و مقیاس پروژه ما را در آینده آسان تر می کند. ما کد خود را به چهار فایل تقسیم می کنیم:

  • kafka_settings.py: پیکربندی های کافکا ما را نگه می دارد.
  • kafka_producer.py: شامل یک عرف است produce() روش و رسیدگی به خطا
  • kafka_producer_message.py: انواع مختلف داده ورودی را کنترل می کند.
  • advanced_producer.py: برنامه نهایی ما را با استفاده از کلاس های سفارشی ما اجرا می کند.

اول، ما KafkaSettings class تنظیمات آپاچی کافکا ما را کپسوله می کند، بنابراین ما می توانیم به راحتی از فایل های دیگر خود بدون تکرار کد به آنها دسترسی داشته باشیم:

# kafka_settings.py
import os

class KafkaSettings:
    def __init__(self):
                      self.conf = {
            'bootstrap.servers': os.getenv('KAFKA_BOOTSTRAP_SERVERS'),
            'security.protocol': os.getenv('KAFKA_SECURITY_PROTOCOL'),
            'sasl.mechanisms': os.getenv('KAFKA_SASL_MECHANISMS'),
            'sasl.username': os.getenv('KAFKA_SASL_USERNAME'),
            'sasl.password': os.getenv('KAFKA_SASL_PASSWORD'),
        }

بعد، ما KafkaProducer به ما اجازه می دهد تا خود را سفارشی کنیم produce() روش با پشتیبانی از خطاهای مختلف (به عنوان مثال، خطا در زمانی که اندازه پیام خیلی بزرگ است)، و همچنین به طور خودکار پیام ها را شستشو می دهد یک بار تولید شد:

# kafka_producer.py
from confluent_kafka import KafkaError, KafkaException, Producer

from kafka_producer_message import ProducerMessage
from kafka_settings import KafkaSettings

class KafkaProducer:
    def __init__(self, settings: KafkaSettings):
        self._producer = Producer(settings.conf)

    def produce(self, message: ProducerMessage):
        try:
            self._producer.produce(message.topic, key=message.key, value=message.value)
            self._producer.flush()
        except KafkaException as exc:
            if exc.args[0].code() == KafkaError.MSG_SIZE_TOO_LARGE:
                pass  # Handle the error here
            else:
                raise exc

در بلوک try-except مثال ما، اگر پیام آنقدر بزرگ باشد که خوشه کافکا نمی تواند آن را مصرف کند، از روی آن می گذریم. با این حال، شما باید کد خود را در مرحله تولید به روز کنید تا این خطا به درستی مدیریت شود. رجوع به confluent-kafka مستندات برای لیست کامل کدهای خطا

حالا ما ProducerMessage کلاس انواع مختلفی از داده های ورودی را مدیریت می کند و آنها را به درستی سریال می کند. ما قابلیت‌هایی را برای واژه‌نامه‌ها، رشته‌های یونیکد و رشته‌های بایت اضافه می‌کنیم:

# kafka_producer_message.py
import json

class ProducerMessage:
    def __init__(self, topic: str, value, key=None) -> None:
        self.topic = f'{topic}'
        self.key = key
        self.value = self.convert_value_to_bytes(value)

    @classmethod
    def convert_value_to_bytes(cls, value):
        if isinstance(value, dict):
            return cls.from_json(value)

        if isinstance(value, str):
            return cls.from_string(value)

        if isinstance(value, bytes):
            return cls.from_bytes(value)

        raise ValueError(f'Wrong message value type: {type(value)}')

    @classmethod
    def from_json(cls, value):
        return json.dumps(value, indent=None, sort_keys=True, default=str, ensure_ascii=False)

    @classmethod
    def from_string(cls, value):
        return value.encode('utf-8')

    @classmethod
    def from_bytes(cls, value):
        return value

در نهایت، ما می‌توانیم برنامه خود را با استفاده از کلاس‌های جدید ایجاد شده در آن بسازیم advanced_producer.py:

# advanced_producer.py
from dotenv import load_dotenv

from kafka_producer import KafkaProducer
from kafka_producer_message import ProducerMessage
from kafka_settings import KafkaSettings

def main():
    settings = KafkaSettings()
    producer = KafkaProducer(settings)
    message = ProducerMessage(
        topic='MyFirstKafkaTopic',
        value={"value": "MyFirstKafkaValue"},
        key=None,
    )
    producer.produce(message)

if __name__ == '__main__':
    load_dotenv()
    main()

ما اکنون یک انتزاع منظم در بالای صفحه داریم confluent-kafka کتابخانه تولید کننده سفارشی ما دارای عملکردی مشابه تولید کننده ساده ما با مقیاس پذیری و انعطاف پذیری بیشتر است و آماده سازگاری با نیازهای مختلف است. حتی اگر بخواهیم می‌توانیم کتابخانه زیربنایی را کاملاً تغییر دهیم، که پروژه ما را برای موفقیت و قابلیت نگهداری طولانی مدت تنظیم می‌کند.

داشبورد Confluent's Cluster Overview: تولید دو جهش را نشان می دهد، Storage دو مرحله (با خطوط افقی) را نشان می دهد، و Consumption هیچ داده ای را نشان نمی دهد.
دومین پیام آزمایشی خروجی و ذخیره سازی

بعد از دویدن python3 advanced_producer.py، دوباره می بینیم که داده ها به خوشه ما در قسمت ارسال شده است نمای کلی کلاستر > داشبورد پانل ابر متجانس. پس از ارسال یک پیام با تولید کننده ساده و پیام دوم با تولید کننده سفارشی ما، اکنون شاهد دو جهش در توان تولید و افزایش ذخیره سازی کلی استفاده شده هستیم.

نگاه به آینده: از تولیدکننده تا مصرف کننده

یک معماری میکروسرویس مبتنی بر رویداد، پروژه شما را بهبود می بخشد و مقیاس پذیری، انعطاف پذیری، قابلیت اطمینان و ارتباطات ناهمزمان آن را بهبود می بخشد. این آموزش نگاهی اجمالی از این مزایا در عمل به شما ارائه کرده است. با راه‌اندازی و راه‌اندازی تولیدکننده در مقیاس سازمانی، و ارسال موفقیت‌آمیز پیام‌ها به کارگزار کافکا، گام‌های بعدی ایجاد مشتری برای خواندن این پیام‌ها از سرویس‌های دیگر و افزودن Docker به برنامه ما خواهد بود.

تیم تحریریه وبلاگ مهندسی تاپتال از شما تشکر می کند E. Deniz Toktay برای بررسی نمونه کدها و سایر مطالب فنی ارائه شده در این مقاله.

ادامه مطلب در وبلاگ مهندسی تاپتال:



منبع

Matthew Newman

Matthew Newman Matthew has over 15 years of experience in database management and software development, with a strong focus on full-stack web applications. He specializes in Django and Vue.js with expertise deploying to both server and serverless environments on AWS. He also works with relational databases and large datasets
[ Back To Top ]