Web APIs I/O connector
- Java SDK
The Beam SDKs include a built-in transform, called RequestResponseIO to support reads and writes with Web APIs such as REST or gRPC.
Discussion below focuses on the Java SDK. Python examples will be added in the future; see tracker issue: #30422. Additionally, support for the Go SDK is not yet available; see tracker issue: #30423.
RequestResponseIO Features
Features this transform provides include:
- developers provide minimal code that invokes Web API endpoint
- delegate to the transform to handle request retries and exponential backoff
- optional caching of request and response associations
- optional metrics
This guide currently focuses on the first two bullet points above, the minimal code requirements and error handling. In the future, it may be expanded to show examples of additional features. Links to additional resources is provided below.
Additional resources
Before you start
To use RequestResponseIO, add the dependency to your Gradle build.gradle(.kts)
or
Maven pom.xml
file. See
Maven Central for available versions.
Below shows an example adding the Beam BOM
and related dependencies such as Beam core to your build.gradle(.kts)
file.
// Apache Beam BOM
// https://central.sonatype.com/artifact/org.apache.beam/beam-sdks-java-bom
implementation("org.apache.beam:beam-sdks-java-bom:2.62.0")
// Beam Core SDK
// https://central.sonatype.com/artifact/org.apache.beam/beam-sdks-java-core
implementation("org.apache.beam:beam-sdks-java-core")
// RequestResponseIO dependency
// https://central.sonatype.com/artifact/org.apache.beam/beam-sdks-java-io-rrio
implementation("org.apache.beam:beam-sdks-java-io-rrio")
Or using Maven, add the artifact dependency to your pom.xml
file.
RequestResponseIO basics
Minimal code
The minimal code needed to read from or write to Web APIs is:
- Caller implementation.
- Instantiate RequestResponseIO.
Implementing the Caller
Caller requires only one method override: call, whose purpose is to interact with the API, converting a request into a response. The transform’s DoFn invokes this method within its DoFn.ProcessElement method. The transform handles everything else including repeating failed requests and exponential backoff (discussed more below).
// MyCaller invokes a Web API with MyRequest and returns the resulting MyResponse.
class MyCaller<MyRequest, MyResponse> implements Caller<MyRequest, MyResponse> {
@Override
public MyResponse call(MyRequest request) throws UserCodeExecutionException {
// Do something with request and return the response.
}
}
Instantiate RequestResponseIO
Using RequestResponseIO
is as simple as shown below. As mentioned, it minimally requires two parameters: the Caller
and the expected
Coder of the response. (Note: If the concept of a Beam Coder is new to you, please see the
Apache Beam Programming Guide
on this subject. This guide also has an example below.)
The RequestResponseIO
transform returns a Result
that bundles any failures and the PCollection
of successful responses. In Beam, we call this the
additional outputs pattern,
which typically requires a bit of boilerplate but the transform takes care of it for you. Using the transform,
you get the success and failure PCollection
s via
Result::getFailures
and Result::getResponses.
Below shows an abbreviated snippet how the transform may work in your pipeline.
// Step 1. Define the Coder for the response.
Coder<MyResponse> responseCoder = ...
// Step 2. Build the request PCollection.
PCollection<MyRequest> requests = ...
// Step 3. Instantiate the RequestResponseIO with the Caller and Coder and apply it to the request PCollection.
Result<MyResponse> result = requests.apply(RequestResponseIO.of(new MyCaller(), responseCoder));
// Step 4a. Do something with the responses.
result.getResponses().apply( ... );
// Step 4b. Apply failures to a dead letter sink.
result.getFailures().apply( ... );
RequestResponseIO
takes care of everything else needed to invoke the Caller
for each request. It doesn’t care what
you do inside your Caller
, whether you make raw HTTP calls or use client code. Later this guide discusses the
advantage of this design for testing.
API call repeats and failures
As mentioned above, RequestResponseIO
returns a
Result
that bundles both the success and failure PCollection
s resulting from your Caller
. This section provides
a little more detail about handling failures and specifics on API call repeats with backoff.
Handling failures
The failures are an
ApiIOError
PCollection
that you may apply to a logging transform or a transform that
saves the errors to a downstream sink for later analysis and troubleshooting.
Since ApiIOError
is already mapped to a Beam Schema, it has compatibility with most of Beam’s existing I/O
connectors.
(Note: If the concept of Beam Schemas is new to you, please see the
Beam Programming Guide.)
For example, you can easily send ApiIOError
records to BigQuery for analysis and troubleshooting as shown
below without converting the records first to a
TableRow.
static void writeFailuresToBigQuery(
PCollection<ApiIOError> failures,
TableReference tableReference,
BigQueryIO.Write.CreateDisposition createDisposition,
BigQueryIO.Write.WriteDisposition writeDisposition) {
// PCollection<ApiIOError> failures = ...
// TableReference tableReference = ...
// BigQueryIO.Write.CreateDisposition createDisposition = ...
// BigQueryIO.Write.WriteDisposition writeDisposition = ...
failures.apply(
"Dead letter",
BigQueryIO.<ApiIOError>write()
.useBeamSchema()
.to(tableReference)
.withCreateDisposition(createDisposition)
.withWriteDisposition(writeDisposition));
}
API call repeats and backoff
Prior to emitting to the failure PCollection
, the transform performs a retry for certain errors
after a prescribed exponential backoff. Your Caller
must throw specific errors, to signal the transform
to perform the retry with backoff. Throwing a
UserCodeExecutionException
will immediately emit the error into the ApiIOError
PCollection
.
RequestResponseIO
will attempt a retry with backoff when Caller
throws:
After a threshold number of retries, the error is emitted into the failure PCollection
.
Testing
Since RequestResponseIO
doesn’t care what you do inside your Caller
implementation, this makes some testing more convenient.
Instead of relying on direct calls to a real API within some tests, consequently depending on your external resource,
you simply implement a version of your Caller
returning responses or throwing exceptions, according to your test logic.
For example, if you want to test a downstream step in your pipeline for a specific response, say empty records, you
could easily do so via the following. For more information on testing your Beam Pipelines, see
the Beam Programming Guide.
@Test
void givenEmptyResponse_thenExpectSomething() {
// Test expects PTransform underTest should do something as a result of empty records, for example.
PTransform<Iterable<String>, ?> underTest = ...
PCollection<String> requests = pipeline.apply(Create.of("aRequest"));
IterableCoder<String> coder = IterableCoder.of(StringUtf8Coder.of());
Result<Iterable<String>> result = requests.apply(RequestResponseIO.of(new MockEmptyIterableResponse()), coder);
PAssert.that(result.getResponses().apply(underTest)).containsInAnyOrder(...)
pipeline.run();
}
// MockEmptyIterableResponse simulates when there are no results from the API.
class MockEmptyIterableResponse<String, Iterable<String>> implements Caller<String, Iterable<String>> {
@Override
public Iterable<String> call(String request) throws UserCodeExecutionException {
return Collections.emptyList();
}
}
Practical examples
Below shows two examples that we will bring together in an end-to-end Beam pipeline. The goal of this pipeline is to download images and use Gemini on Vertex AI to recognize the image content.
Note that this example does not replace our current AI/ML solutions. Please see Get started with AI/ML pipelines for more details on using Beam with AI/ML.
Working with HTTP calls directly
We first need to download images. To do so, we need to make HTTP calls to the image URL and emit their content
into a PCollection
for use with the Gemini API. The value of this example on its own is that it demonstrates
how to use RequestResponseIO
to make raw HTTP requests.
Define Caller
We implement the Caller
, the HttpImageClient
, that receives an ImageRequest
and returns an ImageResponse
.
For demo purposes, the example uses a
KV
to preserve the raw URL in the returned ImageResponse
containing KV
.
Abbreviated snippet
Below shows an abbreviated version of the HttpImageClient
showing the important parts.
class HttpImageClient implements Caller<KV<String, ImageRequest>, KV<String, ImageResponse>> {
private static final HttpRequestFactory REQUEST_FACTORY =
new NetHttpTransport().createRequestFactory();
@Override
public KV<String, ImageResponse> call(KV<String, ImageRequest> requestKV) throws UserCodeExecutionException {
ImageRequest request = requestKV.getValue();
GenericUrl url = new GenericUrl(request.getImageUrl());
HttpRequest imageRequest = REQUEST_FACTORY.buildGetRequest(url);
HttpResponse response = imageRequest.execute();
return KV.of(
requestKV.getKey(),
ImageResponse
.builder()
// Build ImageResponse from HttpResponse
.build()
);
}
}
Full example
The full implementation is shown below illustrating throwing various exceptions based on the HTTP response code.
/**
* Implements {@link Caller} to process an {@link ImageRequest} into an {@link ImageResponse} by
* invoking the HTTP request.
*/
class HttpImageClient implements Caller<KV<String, ImageRequest>, KV<String, ImageResponse>> {
private static final int STATUS_TOO_MANY_REQUESTS = 429;
private static final int STATUS_TIMEOUT = 408;
private static final HttpRequestFactory REQUEST_FACTORY =
new NetHttpTransport().createRequestFactory();
static HttpImageClient of() {
return new HttpImageClient();
}
/**
* Invokes an HTTP Get request from the {@param request}, returning an {@link ImageResponse}
* containing the image data.
*/
@Override
public KV<String, ImageResponse> call(KV<String, ImageRequest> requestKV)
throws UserCodeExecutionException {
String key = requestKV.getKey();
ImageRequest request = requestKV.getValue();
Preconditions.checkArgument(request != null);
GenericUrl url = new GenericUrl(request.getImageUrl());
try {
HttpRequest imageRequest = REQUEST_FACTORY.buildGetRequest(url);
HttpResponse response = imageRequest.execute();
if (response.getStatusCode() >= 500) {
// Tells transform to repeat the request.
throw new UserCodeRemoteSystemException(response.getStatusMessage());
}
if (response.getStatusCode() >= 400) {
switch (response.getStatusCode()) {
case STATUS_TOO_MANY_REQUESTS:
// Tells transform to repeat the request.
throw new UserCodeQuotaException(response.getStatusMessage());
case STATUS_TIMEOUT:
// Tells transform to repeat the request.
throw new UserCodeTimeoutException(response.getStatusMessage());
default:
// Tells the tranform to emit immediately into failure PCollection.
throw new UserCodeExecutionException(response.getStatusMessage());
}
}
InputStream is = response.getContent();
byte[] bytes = ByteStreams.toByteArray(is);
return KV.of(
key,
ImageResponse.builder()
.setMimeType(request.getMimeType())
.setData(ByteString.copyFrom(bytes))
.build());
} catch (IOException e) {
// Tells the tranform to emit immediately into failure PCollection.
throw new UserCodeExecutionException(e);
}
}
}
Define request
ImageRequest
is the custom request we provide the HttpImageClient
, defined in the example above, to invoke the HTTP call
that acquires the image.
This example happens to use Google AutoValue,
but you can use any custom Serializable
Java class as you would in any Beam PCollection
,
including inherent Java classes such as String
, Double
, etc. For convenience, this example uses
@DefaultSchema(AutoValueSchema.class)
allowing us to map our custom type to a
Beam Schema automatically based on its getters.
/** An HTTP request for an image. */
@DefaultSchema(AutoValueSchema.class)
@AutoValue
abstract class ImageRequest implements Serializable {
static final TypeDescriptor<ImageRequest> TYPE = TypeDescriptor.of(ImageRequest.class);
private static final Map<String, String> EXT_MIMETYPE_MAP =
ImmutableMap.of(
"jpg", "image/jpeg",
"jpeg", "image/jpeg",
"png", "image/png");
/** Derive the MIME type of the image from the url based on its extension. */
private static String mimeTypeOf(String url) {
String ext = FileNameUtils.getExtension(url);
if (!EXT_MIMETYPE_MAP.containsKey(ext)) {
throw new IllegalArgumentException(
String.format("could not map extension to mimetype: ext %s of url: %s", ext, url));
}
return EXT_MIMETYPE_MAP.get(ext);
}
static Builder builder() {
return new AutoValue_ImageRequest.Builder();
}
/** Build an {@link ImageRequest} from a {@param url}. */
static ImageRequest of(String url) {
return builder().setImageUrl(url).setMimeType(mimeTypeOf(url)).build();
}
/** The URL of the image request. */
abstract String getImageUrl();
/** The MIME type of the image request. */
abstract String getMimeType();
@AutoValue.Builder
abstract static class Builder {
abstract Builder setImageUrl(String value);
abstract Builder setMimeType(String value);
abstract ImageRequest build();
}
}
Define response
ImageResponse
is the custom response we return from the HttpImageClient
, defined in the example above, that contains the image data
as a result of calling the remote server with the image URL.
Again,
this example happens to use Google AutoValue,
but you can use any custom Serializable
Java class as you would in any Beam PCollection
including inherent Java classes such as String
, Double
, etc.
/** An HTTP response of an image request. */
@DefaultSchema(AutoValueSchema.class)
@AutoValue
abstract class ImageResponse implements Serializable {
static Builder builder() {
return new AutoValue_ImageResponse.Builder();
}
/** The MIME type of the response payload. */
abstract String getMimeType();
/** The payload of the response containing the image data. */
abstract ByteString getData();
@AutoValue.Builder
abstract static class Builder {
abstract Builder setMimeType(String value);
abstract Builder setData(ByteString value);
abstract ImageResponse build();
}
}
Define response coder
RequestResponseIO
needs the response’s
Coder
as its second required parameter, shown in the example below. Please see the
Beam Programming Guide
for more information about Beam Coders.
/** A {@link CustomCoder} of an {@link ImageResponse}. */
class ImageResponseCoder extends CustomCoder<ImageResponse> {
public static ImageResponseCoder of() {
return new ImageResponseCoder();
}
private static final Coder<byte[]> BYTE_ARRAY_CODER = ByteArrayCoder.of();
private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
@Override
public void encode(ImageResponse value, OutputStream outStream)
throws CoderException, IOException {
BYTE_ARRAY_CODER.encode(value.getData().toByteArray(), outStream);
STRING_CODER.encode(value.getMimeType(), outStream);
}
@Override
public ImageResponse decode(InputStream inStream) throws CoderException, IOException {
byte[] data = BYTE_ARRAY_CODER.decode(inStream);
String mimeType = STRING_CODER.decode(inStream);
return ImageResponse.builder().setData(ByteString.copyFrom(data)).setMimeType(mimeType).build();
}
}
Acquire image data from URLs
Below shows an example how to bring everything together in an end-to-end pipeline. From a list of image URLs,
the example builds the PCollection
of ImageRequest
s that is applied to an instantiated RequestResponseIO
,
using the HttpImageClient
Caller
implementation.
Any failures, accessible from the Result
’s getFailures
getter, are outputted to logs. As already discussed above,
one could write these failures to a database or filesystem.
/** Example demonstrating downloading a list of image URLs using {@link RequestResponseIO}. */
static void readFromGetEndpointExample(List<String> urls, Pipeline pipeline) {
// Pipeline pipeline = Pipeline.create();
// List<String> urls = ImmutableList.of(
// "https://storage.googleapis.com/generativeai-downloads/images/cake.jpg",
// "https://storage.googleapis.com/generativeai-downloads/images/chocolate.png",
// "https://storage.googleapis.com/generativeai-downloads/images/croissant.jpg",
// "https://storage.googleapis.com/generativeai-downloads/images/dog_form.jpg",
// "https://storage.googleapis.com/generativeai-downloads/images/factory.png",
// "https://storage.googleapis.com/generativeai-downloads/images/scones.jpg"
// );
// Step 1: Convert the list of URLs to a PCollection of ImageRequests.
PCollection<KV<String, ImageRequest>> requests = Images.requestsOf(urls, pipeline);
// Step 2: RequestResponseIO requires a Coder as its second parameter.
KvCoder<String, ImageResponse> responseCoder =
KvCoder.of(StringUtf8Coder.of(), ImageResponseCoder.of());
// Step 3: Process ImageRequests using RequestResponseIO instantiated from the Caller
// implementation and the expected PCollection response Coder.
Result<KV<String, ImageResponse>> result =
requests.apply(
ImageResponse.class.getSimpleName(),
RequestResponseIO.of(HttpImageClient.of(), responseCoder));
// Step 4: Log any failures to stderr.
result.getFailures().apply("logErrors", Log.errorOf());
// Step 5: Log output to stdout.
Images.displayOf(result.getResponses()).apply("logResponses", Log.infoOf());
}
The pipeline output, shown below, displays a summary of the downloaded image, its URL, mimetype and size.
KV{https://storage.googleapis.com/generativeai-downloads/images/factory.png, mimeType=image/png, size=23130}
KV{https://storage.googleapis.com/generativeai-downloads/images/scones.jpg, mimeType=image/jpeg, size=394671}
KV{https://storage.googleapis.com/generativeai-downloads/images/cake.jpg, mimeType=image/jpeg, size=253809}
KV{https://storage.googleapis.com/generativeai-downloads/images/chocolate.png, mimeType=image/png, size=29375}
KV{https://storage.googleapis.com/generativeai-downloads/images/croissant.jpg, mimeType=image/jpeg, size=207281}
KV{https://storage.googleapis.com/generativeai-downloads/images/dog_form.jpg, mimeType=image/jpeg, size=1121752}
Using API client code
The last example demonstrated invoking HTTP requests directly. However, there are some API services that provide
client code that one should use within the Caller
implementation. Using client code within Beam presents
unique challenges, namely serialization. Additionally, some client code requires explicit handling in terms of
setup and teardown.
RequestResponseIO
can handle an additional interface called SetupTeardown
for these scenarios.
The SetupTeardown interface has only two methods, setup and teardown.
The transform calls these setup and teardown methods within its DoFn’s @Setup and @Teardown, methods respectively.
The transform also handles retries with backoff, likewise dependent on the thrown Exception, as discussed previously in this guide.
Define Caller with SetupTeardown
Below is
an example that adapts
Vertex AI Gemini Java Client
to work in a Beam pipeline using RequestResponseIO
, adding usage of the SetupTeardown
interface,
in addition to the required Caller
. It has a bit more boilerplate than the simple HTTP example above.
Abbreviated snippet
An abbreviated snippet showing the important parts is shown below.
The setup
method is where the GeminiAIClient
instantiates VertexAI
and GenerativeModel
, finally closing
VertexAI
during teardown
. Finally, its call
method looks similar to the HTTP example above, where it takes a
request, uses it to invoke an API, and returns the response.
class GeminiAIClient implements
Caller<KV<String, GenerateContentRequest>, KV<String, GenerateContentResponse>>,
SetupTeardown {
@Override
public KV<String, GenerateContentResponse> call(KV<String, GenerateContentRequest> requestKV)
throws UserCodeExecutionException {
GenerateContentResponse response = client.generateContent(request.getContentsList());
return KV.of(requestKV.getKey(), response);
}
@Override
public void setup() throws UserCodeExecutionException {
vertexAI = new VertexAI(getProjectId(), getLocation());
client = new GenerativeModel(getModelName(), vertexAI);
}
@Override
public void teardown() throws UserCodeExecutionException {
vertexAI.close();
}
}
Full example
Below shows the full example.
Key to this example is that com.google.cloud.vertexai.VertexAI
and com.google.cloud.vertexai.generativeai.GenerativeModel
are not serializable and therefore need to be
instantiated with transient
. You can ignore @MonotonicNonNull
if your java project does not use the
https://checkerframework.org/.
/**
* Example {@link Caller} and {@link SetupTeardown} implementation for use with {@link
* RequestResponseIO} to process Gemini AI {@link GenerateContentRequest}s into {@link
* GenerateContentResponse}s.
*/
@AutoValue
abstract class GeminiAIClient
implements Caller<KV<String, GenerateContentRequest>, KV<String, GenerateContentResponse>>,
SetupTeardown {
static Builder builder() {
return new AutoValue_GeminiAIClient.Builder();
}
static final String MODEL_GEMINI_PRO = "gemini-pro";
static final String MODEL_GEMINI_PRO_VISION = "gemini-pro-vision";
private transient @MonotonicNonNull VertexAI vertexAI;
private transient @MonotonicNonNull GenerativeModel client;
@Override
public KV<String, GenerateContentResponse> call(KV<String, GenerateContentRequest> requestKV)
throws UserCodeExecutionException {
String key = requestKV.getKey();
GenerateContentRequest request = requestKV.getValue();
if (request == null) {
throw new UserCodeExecutionException("request is empty");
}
if (request.getContentsList().isEmpty()) {
throw new UserCodeExecutionException("contentsList is empty");
}
try {
GenerateContentResponse response =
checkStateNotNull(client).generateContent(request.getContentsList());
return KV.of(key, response);
} catch (IOException e) {
throw new UserCodeExecutionException(e);
}
}
@Override
public void setup() throws UserCodeExecutionException {
vertexAI = new VertexAI(getProjectId(), getLocation());
client = new GenerativeModel(getModelName(), vertexAI);
}
@Override
public void teardown() throws UserCodeExecutionException {
if (vertexAI != null) {
vertexAI.close();
}
}
abstract String getModelName();
abstract String getProjectId();
abstract String getLocation();
@AutoValue.Builder
abstract static class Builder {
abstract Builder setModelName(String name);
abstract Optional<String> getModelName();
abstract Builder setProjectId(String value);
abstract Builder setLocation(String value);
abstract GeminiAIClient autoBuild();
final GeminiAIClient build() {
if (!getModelName().isPresent()) {
setModelName(MODEL_GEMINI_PRO);
}
return autoBuild();
}
}
Ask Gemini AI to identify the image
Now let’s combine the previous example of acquiring an image to this Gemini AI client to ask it to identify the image.
Below is what we saw previously but encapsulated in a convenience method. It takes a List
of urls, and returns
a PCollection
of ImageResponse
s containing the image data.
/**
* Processes a list of raw image URLs into a {@link ImageResponse} {@link PCollection} using
* {@link RequestResponseIO}. The resulting {@link KV#getKey} is the original image URL.
*/
static Result<KV<String, ImageResponse>> imagesOf(List<String> urls, Pipeline pipeline) {
Coder<KV<String, ImageResponse>> kvCoder = KvCoder.of(STRING_CODER, ImageResponseCoder.of());
return requestsOf(urls, pipeline)
.apply(
ImageResponse.class.getSimpleName(),
RequestResponseIO.of(HttpImageClient.of(), kvCoder));
}
Next we convert the ImageResponse
s into a PCollection
of GenerateContentRequest
s.
// PCollection<KV<Struct, ImageResponse>> imagesKV = ...
return imagesKV
.apply(
stepName,
MapElements.into(requestKVType)
.via(
kv -> {
String key = kv.getKey();
ImageResponse safeResponse = checkStateNotNull(kv.getValue());
ByteString data = safeResponse.getData();
return buildAIRequest(key, prompt, data, safeResponse.getMimeType());
}))
.setCoder(kvCoder);
Finally, we apply the PCollection
of GenerateContentRequest
s to RequestResponseIO
, instantiated using the
GeminiAIClient
, defined above. Notice instead of RequestResponseIO.of
, we are using
RequestResponseIO.ofCallerAndSetupTeardown
. The ofCallerAndSetupTeardown
method just tells the compiler that we are
providing an implementation of both the Caller
and SetupTeardown
interfaces.
// PCollection<KV<Struct, GenerateContentRequest>> requestKV = ...
// GeminiAIClient client =
// GeminiAIClient.builder()
// .setProjectId(options.getProject())
// .setLocation(options.getLocation())
// .setModelName(MODEL_GEMINI_PRO_VISION)
// .build();
return requestKV.apply(
"Ask Gemini AI", RequestResponseIO.ofCallerAndSetupTeardown(client, responseCoder));
The full end-to-end pipeline is shown below.
/** Demonstrates using Gemini AI to identify a images, acquired from their URLs. */
static void whatIsThisImage(List<String> urls, GeminiAIOptions options) {
// GeminiAIOptions options = PipelineOptionsFactory.create().as(GeminiAIOptions.class);
// options.setLocation("us-central1");
// options.setProjectId("your-google-cloud-project-id");
//
//
// List<String> urls = ImmutableList.of(
// "https://storage.googleapis.com/generativeai-downloads/images/cake.jpg",
// "https://storage.googleapis.com/generativeai-downloads/images/chocolate.png",
// "https://storage.googleapis.com/generativeai-downloads/images/croissant.jpg",
// "https://storage.googleapis.com/generativeai-downloads/images/dog_form.jpg",
// "https://storage.googleapis.com/generativeai-downloads/images/factory.png",
// "https://storage.googleapis.com/generativeai-downloads/images/scones.jpg"
// );
// Step 1: Instantiate GeminiAIClient, the Caller and SetupTeardown implementation.
GeminiAIClient client =
GeminiAIClient.builder()
.setProjectId(options.getProject())
.setLocation(options.getLocation())
.setModelName(MODEL_GEMINI_PRO_VISION)
.build();
Pipeline pipeline = Pipeline.create(options);
// Step 2: Download the images from the list of urls.
Result<KV<String, ImageResponse>> getImagesResult = Images.imagesOf(urls, pipeline);
// Step 3: Log any image download errors.
getImagesResult.getFailures().apply("Log get images errors", Log.errorOf());
// Step 4: Build Gemini AI requests from the download image data with the prompt 'What is this
// picture?'.
PCollection<KV<String, GenerateContentRequest>> requests =
buildAIRequests("Identify Image", "What is this picture?", getImagesResult.getResponses());
// Step 5: Using RequestResponseIO, ask Gemini AI 'What is this picture?' for each downloaded
// image.
Result<KV<String, GenerateContentResponse>> responses = askAI(client, requests);
// Step 6: Log any Gemini AI errors.
responses.getFailures().apply("Log AI errors", Log.errorOf());
// Step 7: Log the result of Gemini AI's image recognition.
responses.getResponses().apply("Log AI answers", Log.infoOf());
pipeline.run();
}
Below shows an abbreviated output of running the full pipeline, where we see the result of Gemini AI identifying the images.
KV{https://storage.googleapis.com/generativeai-downloads/images/chocolate.png, candidates {
content {
role: "model"
parts {
text: " This is a picture of a chocolate bar."
}
}
KV{https://storage.googleapis.com/generativeai-downloads/images/dog_form.jpg, candidates {
content {
role: "model"
parts {
text: " The picture is a dog walking application form. It has two sections, one for information
about the dog and one for information about the owner. The dog\'s name is Fido,
he is a Cavoodle, and he is black and tan. He is 3 years old and has a friendly
temperament. The owner\'s name is Mark, and his phone number is 0491570006. He would
like Fido to be walked once a week on Tuesdays and Thursdays in the morning."
}
}
}
KV{https://storage.googleapis.com/generativeai-downloads/images/croissant.jpg
content {
role: "model"
parts {
text: " The picture shows a basket of croissants. Croissants are a type of pastry that is made
from a yeast-based dough that is rolled and folded several times in the rising process.
The result is a light, flaky pastry that is often served with butter, jam, or chocolate.
Croissants are a popular breakfast food and can also be used as a dessert or snack."
}
}
}
Last updated on 2025/01/22
Have you found everything you were looking for?
Was it all useful and clear? Is there anything that you would like to change? Let us know!