Using Google Pub/Sub Messaging with AppScale

Posted by AppScale Staff on 7/19/18 11:23 AM

In a previous post, we explained how you can host your own version of Google Pub/Sub using an open source emulator. The emulator—provided by Google—implements the Pub/Sub API using Apache Kafka as a data streaming backend. The result is a distributed messaging system that is fully compatible with Pub/Sub apps without locking you into the Google Cloud Platform (GCP).

In this post, we'll demonstrate how to configure AppScale apps to send and receive messages using the emulator.

Why use Pub/Sub with AppScale?

As we mentioned in the previous post, the benefit of publish-subscribe systems like Pub/Sub is that they facilitate asynchronous messaging among various services. One service can send a message to multiple recipients without having to know who the recipients are, where they are, or how many there are. For distributed platforms like AppScale, this lets apps communicate without limiting your ability to deploy, remove, replicate, or scale them.

AppScale's key strength is that it can run on a number of different platforms. Although you don't have to run your apps in GCP to use Pub/Sub, the service is tied to the platform. With Google's open source emulator, you no longer have to be committed to GCP, AWS, or any other cloud platform to use publish/subscribe-style messaging.


Deploying Pub/Sub-Compatible Apps

To demonstrate Pub/Sub on AppScale, we'll deploy two apps: one that publishes messages to a topic, and one that receives them. Before you begin, make sure you follow the "Setting up the Project" section of our previous post to create a working installation of the Pub/Sub emulator. You should also have a working AppScale installation. The emulator must be accessible over the network by your AppScale nodes, since our apps will need to contact it in order to send and receive messages.

Note on Running Kafka

When running Kafka as part of the emulator setup process, you don't need to start a new ZooKeeper server if you're installing the emulator on an AppScale ZooKeeper node. Instead, you can simply start the Kafka server and use the existing ZooKeeper instance.

After completing the setup process, you should have the emulator running on port 8000. It should also be configured with a project called my-project, a topic called my-topic, and a subscription called subscription-to-my-topic.

In the previous post, the example apps opened persistent connections to the emulator using the gRPC protocol. This protocol isn't currently supported in AppScale, so instead we'll use REST API calls to interact with the messaging service via a gateway service. Since App Engine and AppScale requests are often short-lived, we can use a separate worker script to push messages to subscribers using this gateway.


Deploying the REST API Gateway

The REST API gateway allows App Engine apps to send and receive messages using HTTP calls. This enables apps to interact with the messaging backend without having to open a persistent connection.

To install and run the gateway service, you can follow the installation instructions provided on the gateway's GitHub page. (Note that if you choose to compile a standalone build, you should use Ubuntu 18.04 / Bionic Beaver since its repositories contain supported versions of the required packages.) You can also deploy the gateway using Docker or Kubernetes. In the following instructions we rely on a compiled binary that should run on any x86 Linux system.

When running the gateway, specify the locations of both the Kafka emulator and the target port that the gateway will run on. For example, this command connects to the emulator on port 8000 and exposes the gateway on port 8001:

$ curl -O https://s3.amazonaws.com/appscale-build/kafka-pubsub-emulator-gateway
$ chmod uog+rx kafka-pubsub-emulator-gateway
$ ./kafka-pubsub-emulator-gateway start --address 8000 --port 8001

Deploying the Receiver App

The first AppScale app we'll create is a Python script that receives Pub/Sub messages. The script simply logs incoming messages to the console. It confirms the authenticity of the message using a token shared with the sender app.

Open a terminal on your AppScale workstation and create a new directory to hold the app's files:

$ mkdir pubsub-receiver
$ cd pubsub-receiver/

Creating the Receiver Script

Create a new file called receiver.py with the following contents.

import os
import webapp2
import json
import base64

import logging

class PushHandler(webapp2.RequestHandler):
	def post(self):
    	token = os.environ['PUBSUB_VERIFICATION_TOKEN']
    	if self.request.get('token') != token:
        	self.response.set_status(400)
        	self.response.write('Invalid token')
        	return

    	envelope = json.loads(self.request.body)
    	payload = base64.b64decode(envelope['message']['data'])
    	logging.info("Received message: {}".format(payload))
    	self.response.write('OK')


app = webapp2.WSGIApplication([
	('/pubsub/push', PushHandler)
])

Configuring app.yaml

Next, we need to create an app.yaml file to store the project's configuration. This file also contains a token used to verify the authenticity of incoming messages. This token can be any alphanumeric value of your choice, and will be passed into the app on deployment.

application: pubsub-receiver
runtime: python27
api_version: 1
threadsafe: true

handlers:
- url: .*
  script: receiver.app

env_variables:
  PUBSUB_VERIFICATION_TOKEN: <secret token>

Now, deploy the receiver application to AppScale:

$ sudo -i
# appscale deploy 

You can verify that the app was successfully deployed by running appscale status or by checking the AppScale dashboard.


Deploying a Sender Application

Next, we'll create an app to send messages to the receiver app. This is another Python script that accepts an HTTP POST request and forwards the body of the message to the messaging service.

From the terminal, navigate to the parent directory of your receiver app and create a new folder for the sender project:

$ cd ..
$ mkdir pubsub-sender
$ cd pubsub-sender/

Create a new file called sender.py with the following contents:

import os
import webapp2
from google.cloud import pubsub


class IndexHandler(webapp2.RequestHandler):
	def post(self):
    	client = pubsub.Client()
    	topic = client.topic(os.environ['PUBSUB_TOPIC'])
    	topic.publish(self.request.body)

    	self.response.write('OK')


app = webapp2.WSGIApplication([
  ('/', IndexHandler)
])

Managing Third-Party Libraries

This script relies on the Google Cloud Pub/Sub Client Libraries. Since these aren't already available in AppScale, we need to copy the libraries to our app directory.

From the app's root directory, create a new folder called lib.

$ mkdir lib

Use pip to download and install the library into the folder:

$ pip install -t lib/ google-cloud-pubsub==0.22.0

Now, create a file named appengine_config.py in the project's root folder with the following contents:

from google.appengine.ext import vendor
vendor.add('lib')

Configuring app.yaml

The sender's app.yaml file requires several variables including the Pub/Sub emulator URL, the project ID, and the topic ID. It also requires a verification token, which should be identical to the one used in the receiver app.

application: my-project
runtime: python27
api_version: 1
threadsafe: true

handlers:
- url: .*
  script: sender.app

env_variables:
  PUBSUB_EMULATOR_HOST: 'localhost:8000'
  PUBSUB_PROJECT: 'my-project'
  PUBSUB_TOPIC: 'my-topic'
  PUBSUB_VERIFICATION_TOKEN: 

libraries:
- name: webapp2
  version: "2.5.2"

Now, deploy the sender app to AppScale:

$ sudo -i
# appscale deploy 


Deploying th Push Notification Worker

 In order for the receiver app to automatically receive new messages, we need to translate Pub/Sub messages into REST calls. We can do this by using a worker script to maintain a persistent gRPC connection to the emulator, listen for new messages on a subscription, then forward the contents of each message to an AppScale endpoint. This provides the same functionality as the Cloud Pub/Sub push subscription model, but in a way that's compatible with the Pub/Sub emulator.

Start by opening a terminal and creating a new folder to contain the app's project files.

$ mkdir push-worker
$ cd push-worker/

Create a new file called worker.py with the following contents:

import argparse
import base64
import logging
import time

import backoff
import grpc
import requests
import yaml
import urllib3

from google.cloud import pubsub
from requests.exceptions import HTTPError, RequestException

LOG_FORMAT = '%(asctime)s %(levelname)s %(message)s '

SUCCESS_CODES = (102, 200, 201, 202, 204)

@backoff.on_exception(backoff.expo, RequestException)
def make_request(endpoint, data):
	response = requests.post(endpoint, json=data)
	if response.status_code not in SUCCESS_CODES:
    	raise HTTPError('Received response code: {}'.format(response.status_code))


def callback_factory(endpoint):
	def callback(message):
    	size = len(message.data)
    	logging.info('Pushing message to {}. Size: {}'.format(endpoint, size))
    	payload = {
        	"message": {
            	"data": base64.b64encode(message.data).decode('utf8'),
        	}
    	}
    	make_request(endpoint, payload)
    	message.ack()

	return callback


if __name__ == '__main__':
	parser = argparse.ArgumentParser(
    	description='pushes messages from Kafka topics to HTTP endpoints')
	parser.add_argument('config', help='the push worker configuration file')
	args = parser.parse_args()

	urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
	logging.setLevel(logging.INFO)

	with open(args.config) as config_file:
    	config = yaml.safe_load(config_file)

	channel = grpc.insecure_channel(config['pubsub_endpoint'])
	subscriber = pubsub.SubscriberClient(channel=channel)

	for subscription in config['subscriptions']:
    	subscription_path = '/'.join([
        	'projects', subscription['project_id'],
        	'subscriptions', subscription['subscription_id']])
    	logging.info('Subscribing to {}'.format(subscription_path))
    	subscription_function = callback_factory(subscription['endpoint'])
    	subscriber.subscribe(subscription_path, subscription_function)

	while True:
    	time.sleep(60)

This script requires a number of libraries. The easiest way to install these is to create a Virtual Environment:

$ source env/bin/activate
$ pip install backoff grpcio requests pyyaml urllib3 google-cloud-pubsub

We'll store the parameters for this script in a separate configuration file. Create a new file named config.yaml in the same directory with the following contents:

pubsub_endpoint: localhost:8000
subscriptions:
 - project_id: my-project
   subscription_id: subscription-to-my-topic
   endpoint: http:///pubsub/push?token=

The pubsub_endpoint is the URL of your Pub/Sub emulator. If you chose a different name for your project ID or subscription ID, be sure to replace those values here as well. The endpoint is the URL of your receiver app after it's deployed to AppScale. At the end of the endpoint, replace <secret token> with the same token you used for the receiver and sender apps.

Finally, run the script by passing in the configuration file as a parameter.

$ python push.py config.yml

Testing the Deployment

Now that all of the required components have been deployed, you can test your implementation by sending an HTTP POST request to the sender app's endpoint. Encode the contents of your message as JSON in the body of the request.

For example, we'll send a basic "hello world" using curl:

$ curl --data 'Hello world!' http://

You can verify that the message was sent by checking AppScale's logs for the sender app:

# tail /var/log/appscale/app___pubsub-sender_default_v1.log

Jul  12 15:25:45 appscale-image0 DEBUG:google.cloud.pubsub_v1.publisher.batch.thread:gRPC Publish took 0.0360200405121 seconds.

Likewise, you can verify that the receiver has received the message by opening the receiver app's logs:

# tail /var/log/appscale/app___pubsub-receiver_default_v1.log

Jul 12 15:25:50 appscale-image0 INFO 	2018-07-13 15:25:50,676 receiver.py:24] Received message: Hello world!


Conclusion

Pub/Sub is an extremely versatile platform with many practical applications, and Google's emulator makes it possible to run an API-compatible Pub/Sub service on any platform. Instead of relying on the Google Cloud Platform, you can now use Pub/Sub anywhere that you run AppScale.

Google is committed to open source solutions and an open cloud. The Pub/Sub emulator is open source, and so are several projects used by the Pub/Sub team.

We're grateful for Google's commitment and contributions to the open source community and would like to specifically thank our friends at Google Cloud for their help:

  • Kir Titievsky
  • Preston Holmes
  • Vic Iglesias
  • Steve Mansfield
  • Miles Ward

Subscribe to Email Updates

Most Popular

Recent Posts