Chris Coutinho

This post demonstrates how to utilize OAuth2 in Faust Streaming applications. Faust is a streaming library for Python. It provides stream/event processing primitives a la Kafka Streams to process Kafka messages in Python.

Organizations are utilizing OAuth2 for managing federated identities across service boundaries a centralized manner. With the introduction of the OAUTHBEARER SASL mechanism in Kafka 2.0.0, both brokers and clients can be configured to use an external identity provider for authentication, making it easier to manage identities than span across systems.

Authorization in Kafka

Apache Kafka provides an Authorization system based on Access Control Lists (ACLs). Kafka acls are defined in the general format of "Principal P is [Allowed/Denied] Operation O From Host H On Resource R". You can read more about the acl structure on KIP-11.

In addition to SSL encryption, Kafka supports multiple authorization mechanisms via the Simple Authentication and Security Layer (SASL) to enable authentication via third-party servers. This enables Kafka clusters to utilize industry-standard identity providers for all broker and client authentication requests.

Confluent Cloud currently supports the following authorization mechanisms:

  • GSSAPI (Kerberos)
  • PLAIN (Username/Password)
  • SCRAM-SHA (Zookeeper)
  • OAUTHBEARER (OAuth server)

See the Confluent documentation on Enabling SASL SSL for Kafka for more information.

The OAUTHBEARER security mechanism enables a Kafka cluster to utilize a third-party identity provider for authentication. In the case of Confluent Cloud, setting up an external identity provider is very straight forward, assuming you're using an OIDC-compliant identity provider (e.g. Azure AD, Okta, Keycloak). See the documentation for more information

OAuth2/OIDC Authorization

Client Credential flow

OAuth2 in Faust Streaming

Here's an example of a streaming application demonstrating how to connect to a Kafka broker over PLAINTEXT, essentially anonymous and unencrypted. Our client is assuming all messages adhere to the Order schema.

import faust

app = faust.App("myapp", broker="kafka://localhost")

class Order(faust.Record):
    account_id: str
    amount: int

@app.agent(value_type=Order)
async def order(orders):
    async for order in orders:
        print(f"Order for {order.account_id}: {order.amount}")

Faust recently introduced a new authorization mechanism to support OAUTHBEARER authentication in v1.5.0, enabling Faust Streaming workers to authenticate to a Kafka broker configured with an identity provider using OAuth2 Bearer tokens.

Using OAUTHBEARER broker credentials requires that we setup at least a default SSL context, and provide an instance of AbstractTokenProvider to the faust.App during configuration. The new faust.OAuthCredentials class supports a single oauth_cb attribute for an instance of AbstractTokenProvider, which is a class with a single asynchronous method for retrieving the bearer token. Clients are responsible for managing the entire token life cycle, such as handling token refreshes, etc.

import faust
from aiokafka.conn import AbstractTokenProvider
from aiokafka.helpers import create_ssl_context

class CustomTokenProvider(AbstractTokenProvider):
    async def token(self):
        ...

broker_credentials = faust.OAuthCredentials(
    oauth_cb=CustomTokenProvider(),
    ssl_context=create_ssl_context(),
)

app = faust.App(
    "myapp",
    broker=KAFKA_BROKER,
    broker_credentials=broker_credentials,
)

# Setting up Model and Agents same as above.
# ...