Emulating Google's Cloud Pub/Sub on Apache Kafka

Posted by AppScale Staff on 6/26/18 9:39 AM

In cloud applications composed of micro-services, there are a number of ways to send data between services, but they all have their share of problems. Sending messages directly between two endpoints is simple and requires no additional services, but it breaks down as the number of senders and recipients increases. Message queues and task queues alleviate this by buffering messages, but this requires the sender to know recipients in advance. In a dynamic workload, this list could easily change at any time.

This is where publish-subscribe systems like Google Pub/Sub come into play. Pub/Sub acts as a middleman between services, allowing them to communicate asynchronously. Pub/Sub doesn't require a sender to know the recipient. Instead, recipients subscribe to message streams based on topics. For example, a service that wants to store data in a database might send a message with the topic "database" to the Pub/Sub service, which forwards the message to every service subscribed to that topic. It allows for loose coupling between event producers and event consumers.

For example, Pub/Sub can be used to perform real-time analysis of Twitter data. An application collects tweets and forwards them to Pub/Sub with a topic such as "new-tweets." Meanwhile, several other applications ingest the tweets, analyze their contents, and store the results in a data warehouse. These worker applications are all subscribed to the "new-tweets" topic, meaning they will automatically receive new tweets from Pub/Sub as they arrive. The tweet collector app doesn’t know which worker apps are receiving the tweets, just as the worker apps don't know where the tweets originate.

Although Pub/Sub is a hosted Google Cloud Platform service, Google recently open-sourced an emulation tool that lets you host your own publish-subscribe service. This solution is API-compatible with the cloud Pub/Sub service, allowing you to develop, deploy, or migrate Pub/Sub applications without being locked into the Google Cloud Platform. You can even deploy the service over AppScale, Kubernetes, or a platform of your choice for greater portability and scaling.

A Brief Overview of the Pub/Sub API

Pub/Sub distributes messages among applications. When an application publishes a message, it specifies a topic. The Pub/Sub API uses this topic to determine which subscribers should receive that message. Receivers subscribe to specific topic, and if a message arrives for that topic, Pub/Sub forwards the message to all receivers. This mapping of topics to receivers is known as a subscription. Multiple receivers can connect to a subscription, and a topic can have multiple subscriptions, but a subscription can only belong to a single topic.

Setting Up the Project

The project has three key components: Apache Kafka, Apache ZooKeeper, and the Pub/Sub emulator. Kafka provides the backend for the messaging service and relies on ZooKeeper in order to run. The emulator provides the Pub/Sub API that we'll use to relay messages.

To run the project, you will need a Linux host with Java, Apache Maven, Python, and pip installed. The following instructions were performed on a 64-bit Ubuntu 16.04 virtual machine running OpenJDK 8u171, Maven 3.3.9, Python 2.7.12, and pip 10.0.1. You can use either the official Oracle JDK or OpenJDK for the Java runtime. Logged in as root into freshly booted Ubuntu 16.04, you can install the dependencies as follows:

$ apt-get update
$ apt-get install openjdk-8-jdk maven python python-pip 

Running Kafka

Kafka is a distributed platform for reading and writing data streams. Start by downloading the binary release of Kafka (version 2.11 as of this writing). Extract the tar file and open a terminal in the resulting folder.

$ curl http://mirrors.sorengard.com/apache/kafka/1.0.1/kafka_2.11-1.0.1.tgz | tar xz
$ cd kafka_2.11-1.1.0/

Kafka requires ZooKeeper in order to run. If you have an existing ZooKeeper installation (such as an AppScale ZooKeeper node), you can skip this step. Otherwise, you will need to start a ZooKeeper instance. The Kafka download includes a ZooKeeper startup script that you can run using the following command:

$ bin/zookeeper-server-start.sh config/zookeeper.properties

Next, start the Kafka server. By default, the server listens on port 9092:

$ bin/kafka-server-start.sh config/server.properties

Now add the topics you want to use in your subscription service. You can specify parameters for each topic, such as the number of servers that each message is replicated to and the number of logs that the topic is sharded into. Specifying localhost:2181 creates the topic on our local ZooKeeper instance. If you want to use a different ZooKeeper instance, replace this with the host and port of the other instance. For example, the following command creates a topic called "my-topic" on our local server:

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic

Running the Emulator

The Pub/Sub Emulator for Kafka emulates the Pub/Sub API while using Kafka to process the messages. The emulator runs as a standalone Java application, which makes it easy to deploy alone or inside an AppScale deployment. To build the emulator, clone the GitHub repository to your local machine. Open a terminal in the project's root folder, then run mvn package. This generates a self-contained JAR file in the target/ folder.

$ git clone https://github.com/GoogleCloudPlatform/kafka-pubsub-emulator.git
$ cd kafka-pubsub-emulator/ $ mvn package

Before running the emulator, we need to change the configuration file to specify the Kafka server, the topics we've added, and the subscriptions we want to make available. Open a terminal in the emulator project's directory, then copy the configuration template to the root folder:

$ cp src/main/resources/application.yaml .

Open the newly created application.yaml file in a text editor and consider changing the following properties:

  • server.port: the port number that the emulator will run on. You may want to change this to avoid port collisions (for example,  AppScale starts assigning ports to apps starting with port 8080).
  • kafka.bootstrapServers: a comma-separated list of brokers (i.e., nodes) in the Kafka cluster. In this example, we only have one running on the local server.
  • kafka.consumer.subscriptions: a list of Subscription objects. The name, topic, and ackDeadlineSeconds fields are required.
  • kafka.producer.topics: a comma-separated list of topics that the emulator exposes for publishing. The emulator can also auto-create topics.

For example, the following configuration starts the emulator on port 8000 using a Kafka installation running on port 9092. We created a project called my-project with a subscription called subscription-to-my-topic. We also created a map between the topic my-topic and the subscription subscription-to-my-topic.

# application.yaml
server:
  port: 8000

kafka:
  bootstrapServers: localhost:9092
  consumer:
    subscriptions:
      - name: subscription-to-my-topic
        topic: my-topic
        ackDeadlineSeconds: 10
    properties:
      max.poll.records: 2000
  producer:
    topics:
      - my-topic
    properties:
      linger.ms: 5
batch.size: 1000000
buffer.memory: 32000000

You can find an explanation of each option as well as a list of other options on the emulator's GitHub page.

To start the emulator using the standalone JAR file, run the following command: 

$ java -jar target/kafka-pubsub-emulator-1.0.0.0.jar --configuration.location=application.yaml

Using the API

Using the API in your application requires the Pub/Sub Client Libraries. Normally these libraries would contact the Cloud Pub/Sub service, but instead of pointing them to a cloud endpoint, we'll point them to our local server.

For example, we'll create two Python scripts: one publishes messages, and the other subscribes to the message feed. In order for these scripts to run, you will need to install the Pub/Sub client libraries for Python and the grpc framework. Assuming you have pip installed, run the following command to install both:

$ python -m pip install --upgrade grpc google-cloud-pubsub

Publishing Messages

On the publisher side, we'll create a script called sender.py that sends a new message to a specific topic. Note that these scripts connect to the emulator over unencrypted connections.

# sender.py
import argparse

import grpc
from google.cloud import pubsub_v1

if __name__ == '__main__':
	parser = argparse.ArgumentParser(
    	description='Publishes messages to a topic')
	parser.add_argument('--host', required=True, help='The emulator host or IP address')
	parser.add_argument('--port', type=int, required=True,
                    	help='The emulator port number')
	parser.add_argument('--project-id', required=True)
	parser.add_argument('--topic-id', required=True)
	parser.add_argument('message')
	args = parser.parse_args()

	emulator_location = ':'.join([args.host, str(args.port)])
	channel = grpc.insecure_channel(emulator_location)
	publisher = pubsub_v1.PublisherClient(channel=channel)

	topic_path = publisher.topic_path(args.project_id, args.topic_id)
	publisher.publish(topic_path, args.message).result()

To run the script, call python sender.py and pass in the emulator host, port number, project ID, topic ID, and message contents. In this example, we'll send a message to the my-topic topic in my-project. When the emulator receives the message, it maps the message to the correct subscription(s) based on the topic and project. The script exits once the message is sent.

$ python sender.py --host localhost --port 8000 --project-id my-project --topic-id my-topic "This is a message for my-topic on my-project."

Receiving Messages

On the subscriber side, we'll create a script that connects to a subscription and automatically receives published messages. After receiving a message, it then prints the contents of the message.

# receiver.py
import argparse
import time

import grpc
from google.cloud import pubsub_v1

def callback(message):
	print(message)

if __name__ == '__main__':
	parser = argparse.ArgumentParser(
    	description='Prints messages from a subscription')
	parser.add_argument('--host', required=True, help='The emulator host or IP address')
	parser.add_argument('--port', type=int, required=True,
                    	help='The emulator port number')
	parser.add_argument('--project-id', required=True)
	parser.add_argument('--subscription-id', required=True)
	args = parser.parse_args()

	path = '/'.join(['projects', args.project_id, 'subscriptions', args.subscription_id])
	print('Subscribing to {}'.format(path))

	emulator_location = ':'.join([args.host, str(args.port)])
	channel = grpc.insecure_channel(emulator_location)
	subscriber = pubsub_v1.SubscriberClient(channel=channel)
	subscriber.subscribe(path, callback)

	# Keep the process running.
	while True:
    	    time.sleep(60)

To run the script, call python receiver.py and pass in the emulator host, port number, project ID, and subscription ID. In this example, we'll subscribe to subscription-to-my-topic in the my-project project. Since the message we sent using the previous script used a topic that maps to subscription-to-my-topic, this script will print the message's contents. The script will continue running until you stop it.

$ python receiver.py --host localhost --port 8000 --project-id my-project --subscription-id subscription-to-my-topic

Conclusion

While message queues are useful, they're limited in their ability to relay messages in complex dynamic environments. Pub/Sub makes it easy to scale your applications while also ensuring messages arrive at their correct destinations.

Google is committed to developing open source solutions like Pub/Sub. The Pub/Sub emulator itself is open source, and other components are also open source.

Subscribe to Email Updates

Most Popular

Recent Posts