Connector Setup
Connector Setup
Connector Setup
v1.0.0Artifact details
- 1.0.02026-03-24
How to install and authenticate the c2f connector in your infrastructure, including c2f link, secrets management, and production deployment.
The c2f connector is a Python SDK that your data pipeline uses to push records to the c2f extraction API. This page covers the full setup: provisioning credentials, storing them in your secrets manager, and using them at runtime.
Prerequisites
- Python 3.10+
- Access to the c2f portal (your Gerimedica Google account)
- Your secrets manager CLI available (
vault,aws, orgh)
pip install c2f-clientStep 1 — Get a setup token from the portal
- Open the portal → Project Settings → Connectors
- Click New connector
- Copy the setup token — it expires in 15 minutes and can only be used once
Step 2 — Run c2f link
Run this once, on any machine that has access to your secrets manager (dev laptop, ops bastion, or a CI job):
c2f link --token eyJ...What happens:
[1/4] Generating RSA-2048 keypair locally...[2/4] Sending public cert to api.c2f.ai...[3/4] Receiving signed certificate from c2f CA...[4/4] Testing connection...
✓ Connected! Connector registered.
client_id: xhdupsm691nf1wrxtsfms client_secret: dr3JRW... ← shown once, copy it fingerprint: SHA256:3a4f8c... test call: ✓ api.c2f.ai/health → 200 in 142ms
Saved: ~/.c2f/gerimedica/connector.key (private key — never leaves this machine) ~/.c2f/gerimedica/connector.crt (signed certificate) ~/.c2f/gerimedica/config.jsonStep 3 — Push to your secrets manager
The four values that need to go into your secrets manager:
| Key | Secret? |
|---|---|
client_id | Somewhat (treat like a username) |
client_secret | Yes |
cert_b64 | No (public cert, base64 encoded) |
key_b64 | Yes (private key, base64 encoded) |
Base64-encode the cert and key, then load them into your secrets manager:
base64 -w0 ~/.c2f/gerimedica/connector.crt # → cert_b64base64 -w0 ~/.c2f/gerimedica/connector.key # → key_b64Store client_id, client_secret, cert_b64, and key_b64 as four separate secrets (or a single JSON secret) in whichever secrets manager your infrastructure uses.
After this step, the portal connector status changes from 🟡 to 🟢 on the first real API call from production.
Step 4 — Use in production
Zero-config (reads env vars)
from c2f import C2FClient
client = C2FClient.from_env()# Reads: C2F_CLIENT_ID, C2F_CLIENT_SECRET, C2F_CERT_B64, C2F_KEY_B64# Cert and key are decoded in memory — no temp files written to diskKafka / CDC (Debezium)
Don’t submit one record per message — C2F is a batch API and blocking per message caps throughput at ~0.5 records/s. Use a time-windowed buffer instead.
from __future__ import annotations
import jsonimport time
from c2f import C2FClientfrom confluent_kafka import Consumer, Producer
BATCH_SIZE = 100 # flush when buffer reaches this many recordsBATCH_WINDOW_S = 10 # or after this many seconds, whichever comes first
client = C2FClient.from_env()consumer = Consumer({"bootstrap.servers": "...", "group.id": "c2f-enrichment", ...})producer = Producer({"bootstrap.servers": "..."})
consumer.subscribe(["ehr.records.raw"])
buffer: list[dict] = []last_flush = time.monotonic()
while True: msg = consumer.poll(timeout=1.0)
# Debezium CDC envelope: {"op": "c"/"u"/"d", "after": {...}} if msg and not msg.error(): event = json.loads(msg.value()) if event.get("op") in ("c", "u") and event.get("after"): record = event["after"] buffer.append({ "record_id": record["uuid"], # message key = natural idempotency boundary "text": record["clinical_text"], })
should_flush = len(buffer) >= BATCH_SIZE or ( buffer and time.monotonic() - last_flush >= BATCH_WINDOW_S )
if should_flush: try: result = client.submit_batch( buffer, question_set="gerimedica-standard-v1" ).wait(timeout=300)
for item in result.items: producer.produce( "ehr.records.enriched", key=item.record_id, value=json.dumps(item.model_dump()), ) producer.flush() consumer.commit() # only after enriched records are durably produced
except Exception as exc: # Failed batch → DLQ; don't let one bad batch stall the stream producer.produce( "ehr.records.c2f-dlq", value=json.dumps({"records": buffer, "error": str(exc), "ts": time.time()}), ) producer.flush() consumer.commit()
buffer = [] last_flush = time.monotonic()Key design decisions:
- Batch by window, not per message — accumulate up to 100 records or 10 seconds, whichever comes first
record_id= Debezium message key — C2F deduplicates onrecord_id, so re-processing a partition is safe- Commit after downstream produce — offsets are committed only after enriched records land in
ehr.records.enriched - DLQ instead of crash loop — a bad batch goes to
ehr.records.c2f-dlqwith full context; the stream keeps moving
Kedro pipeline
In Kedro, external I/O belongs in a dataset, not a node. Nodes are pure functions — wrap C2F as an AbstractDataset so the catalog owns the connection and credentials.
# src/<project>/datasets/c2f_dataset.pyfrom __future__ import annotations
from typing import Any
from kedro.io import AbstractDatasetfrom c2f import C2FClient
class C2FDataset(AbstractDataset): """Write-only dataset that submits records to the C2F extraction API."""
def __init__(self, question_set: str) -> None: self._question_set = question_set
def _load(self) -> Any: raise NotImplementedError("C2FDataset is a sink — use get_answers() to read results")
def _save(self, records: list[dict]) -> None: client = C2FClient.from_env() client.submit_batch(records, question_set=self._question_set).wait()
def _describe(self) -> dict[str, Any]: return {"question_set": self._question_set}Register in the catalog and wire into a pipeline node:
c2f_enrichment_input: type: <project>.datasets.C2FDataset question_set: gerimedica-standard-v1# pipeline.py — the node is a pure function; C2F I/O is the catalog's jobfrom kedro.pipeline import node
def prepare_records(raw_ehr: pd.DataFrame) -> list[dict]: return [ {"record_id": str(row.uuid), "text": row.clinical_text} for _, row in raw_ehr.iterrows() ]
pipeline = Pipeline([ node(prepare_records, inputs="raw_ehr", outputs="c2f_enrichment_input"),])Kubernetes with Vault Agent
The Vault Agent sidecar injects secrets as environment variables before your container starts:
annotations: vault.hashicorp.com/agent-inject: "true" vault.hashicorp.com/agent-inject-secret-c2f: "secret/c2f/gerimedica" vault.hashicorp.com/agent-inject-template-c2f: | {{- with secret "secret/c2f/gerimedica" -}} export C2F_CLIENT_ID={{ .Data.data.client_id }} export C2F_CLIENT_SECRET={{ .Data.data.client_secret }} export C2F_CERT_B64={{ .Data.data.cert_b64 }} export C2F_KEY_B64={{ .Data.data.key_b64 }} {{- end }}Guided setup with Claude Code
If you use Claude Code, you can paste the following instruction into your session and it will walk you through the entire setup interactively:
I need to set up the c2f connector for our Gerimedica integration.
Context:- c2f-client SDK: pip install c2f-client- Provisioning tool: c2f link --token <setup-token>- Docs: https://docs.c2f.ai/clients/gerimedica/connector-setup
Please guide me through the full setup:1. Check that c2f-client is installed (c2f --version). Install if missing.2. Ask me which secrets manager we use (Vault / AWS / GitHub Secrets / other).3. Walk me through getting a setup token from the portal.4. Run c2f link --token <token> and check the output with --json for diagnostics.5. Show me how to base64-encode the cert and key, then load them into our secrets manager.6. Verify the connection by running c2f whoami.7. Show me the production usage pattern for our stack (ask what we use: Kafka / Kedro / Airflow / plain Python).
If any step fails, read the error JSON carefully and diagnose before retrying.Use c2f <command> --json for all diagnostic output.Connector status in the portal
| Status | Meaning |
|---|---|
| ⚫ No connector | No setup token generated yet |
| 🟡 Registered | c2f link succeeded, no production calls yet |
| 🟢 Active | First authenticated API call received |
| 🔴 Error | Cert expired or credentials revoked — re-run c2f link |
Troubleshooting
# Verify your local configc2f whoami
# Test the connection manuallyc2f ping
# All commands support --json for machine-readable outputc2f link --token eyJ... --jsonc2f whoami --json| Error | Likely cause | Fix |
|---|---|---|
token expired | Setup token > 15 min old | Generate new token in portal |
token already used | Token consumed by a previous run | Generate new token |
cert validation failed | Clock skew or revoked cert | Check system time; re-run c2f link |
mTLS handshake failed | Wrong cert/key pair | Re-run c2f link on same machine |
401 Unauthorized | Wrong client_secret | Re-run c2f link, re-export to secrets manager |