# Enabling OAuth2 with Python streaming applications

This post demonstrates how to utilize OAuth2 in [Faust Streaming](https://faust-streaming.github.io/faust) applications as an alternative authorization flow compared to using static API credentials.

Faust is a streaming library for Python. It provides stream/event processing primitives *a la* Kafka Streams to process Kafka messages in Python.

Organizations are utilizing OAuth2 for managing federated identities across service boundaries a centralized manner. With the introduction of the `OAUTHBEARER` SASL mechanism in Kafka 2.0.0, both brokers and clients can be configured to use an external identity provider for authentication, making it easier to manage identities than span across systems.

# Authorization in Apache Kafka

Apache Kafka provides an Authorization system based on Access Control Lists (ACLs). Kafka ACLs are defined in the general format of "Principal P is \[Allowed/Denied\] Operation O From Host H On Resource R". You can read more about the ACL structure on [KIP-11](https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface).

In addition to SSL encryption, Kafka supports multiple *authorization mechanisms* via the Simple Authentication and Security Layer (SASL) to enable authentication via third-party servers. This enables Kafka clusters to utilize industry-standard identity providers for all broker and client authentication requests.

Confluent Cloud currently supports the following authorization mechanisms:

* `GSSAPI` (Kerberos)
    
* `PLAIN` (Username/Password)
    
* `SCRAM-SHA` (Zookeeper)
    
* `OAUTHBEARER` (OAuth server)
    

See the Confluent documentation on [Enabling SASL SSL for Kafka](https://developer.confluent.io/learn-kafka/security/authentication-ssl-and-sasl-ssl/#enabling-sasl-ssl-for-kafka) for more information on the different authorization models supported.

The `OAUTHBEARER` security mechanism enables a Kafka cluster to utilize a third-party identity provider for authentication. In the case of Confluent Cloud, setting up an external identity provider is very straight forward, assuming you're using an OIDC-compliant identity provider (e.g. Azure AD, Okta, Keycloak). See the [documentation](https://docs.confluent.io/cloud/current/access-management/authenticate/oauth/identity-providers.html) for more information

# Authorization in Faust Streaming

Here's an example of a streaming application demonstrating how to connect to a Kafka broker over `PLAINTEXT`, essentially anonymous and unencrypted. Our client is assuming all messages adhere to the `Order` schema.

```python
import faust

app = faust.App("myapp", broker="kafka://localhost")

class Order(faust.Record):
    account_id: str
    amount: int

@app.agent(value_type=Order)
async def order(orders):
    async for order in orders:
        print(f"Order for {order.account_id}: {order.amount}")
```

Faust recently introduced a new authorization mechanism to support `OAUTHBEARER` authentication in `v1.5.0`, enabling Faust Streaming workers to authenticate to a Kafka broker configured with an identity provider using OAuth2 Bearer tokens.

Using `OAUTHBEARER` broker credentials requires that we setup at least a default SSL context, and provide an instance of `AbstractTokenProvider` to the [`faust.App`](http://faust.App) during configuration. The new `faust.OAuthCredentials` class supports a single `oauth_cb` attribute for an instance of `AbstractTokenProvider`, which is a class with a single asynchronous method for retrieving the bearer token. Clients are responsible for managing the entire token life cycle, such as handling token refreshes, etc.

```python
import faust
from aiokafka.conn import AbstractTokenProvider
from aiokafka.helpers import create_ssl_context

class CustomTokenProvider(AbstractTokenProvider):
    _token: Optional[AccessToken] = None

    def __init__(self, cluster_id: str, pool_id: str):
        self.cluster_id = cluster_id
        self.pool_id = pool_id

    async def token(self):
        return await self.get_token()

    async def get_token(self):
        if self._token is None or (self._token.expires_on - time.time()) < 5 * 60:
            logger.info("token is expired, refreshing...")
            self._token = await self._get_token()
        else:
            logger.info("Current bearer token is valid")

        return self._token.token

    async def _get_token(self):
        logger.info("Generating token")
        credential = DefaultAzureCredential()
        async with credential as cred:
            token = await cred.get_token(settings.KAFKA_OAUTH2_SCOPE)
        logger.info("Token expires: %s", dt.datetime.fromtimestamp(token.expires_on))
        return token

    def extensions(self):
        return {
            "logicalCluster": self.cluster_id,
            "identityPoolId": self.pool_id,
        }


broker_credentials = faust.OAuthCredentials(
    oauth_cb=CustomTokenProvider(    
        cluster_id=settings.KAFKA_CLUSTER_ID,
        pool_id=settings.KAFKA_POOL_ID,
    ),
    ssl_context=create_ssl_context()
)


app = faust.App(
    "myapp",
    broker=KAFKA_BROKER,
    broker_credentials=broker_credentials,
)

# Setting up Model and Agents same as above.
# ...
```
