Drupal Planet

Pierce Lamb: Custom Workflow Orchestration in Python

3 months 2 weeks ago

(I put Custom Workflow Orchestration In Python in an Art Generator and the above popped out)

This post originally appeared on my employer VISO Trust’s blog. It is lightly edited and reproduced here.

On the Data & Machine Learning team at VISO Trust, one of our core goals is to provide Document Intelligence to the audit team. Every Document that passes through the system is subject to collection, parsing, reformatting, analysis, reporting and more. Every day, we work to expand this feature set, increase its accuracy and deliver faster results.

Why we needed workflow orchestration

There are many individual tasks executed which eventually result in what’s provided by Document Intelligence, including but not limited to:

  • Security Control Language Detections
  • Audit Framework Control ID Detections
  • Named Entity Extraction like organizations, dates and more
  • Decryption of encrypted pdfs
  • Translation of foreign language pdfs
  • Document Classification
  • Document Section Detection

Until our workflow orchestration implementation, the features listed above and more were all represented in code inside a single function. Over time, this function became unwieldy and difficult to read; snippets of ceremony, controls, logging, function calls and more sprinkled throughout. Moreover, this is one of the most important areas of our app where new features will be implemented regularly. So the need to clean this code up and make it easier to reason about became clear. Furthermore, execution inside this function occurred sequentially despite the fact that some of its function calls could occur in parallel. While in its current state, parallel execution isn’t required, we knew that in the near future, features in the roadmap would necessitate it. With these two requirements:

  • task execution that is easier to reason about and
  • the ability to execute in parallel

We knew we needed to either use an existing workflow orchestration tool or write it custom. We began with some rough analysis of what was going on in our main automation function, namely, we formalized each ‘step’ into a concept called Task and theorized on which Task’s could execute in parallel. At the time of the analysis, we had 11 ‘Tasks’ each of which required certain inputs and produced certain outputs; based on these inputs and outputs, we determined that a number could run in parallel. With this context, we reviewed some of the major open source python toolkits for workflow orchestration:

Both of these toolkits are designed for managing workflows that have tens, hundreds up to thousands of tasks to complete and can take days or weeks to finish. They have complex schedulers, user interfaces, failure modes, options for a variety of input and output modes and more. Our pipeline will reach this level of complexity someday, but with an 11 Task pipeline, we decided that these toolkits added too much complexity for our use. We resolved to build a custom workflow orchestration toolkit guided by the deep knowledge in these more advanced tools.

Our custom workflow orchestration

The first goal was to generalize all of the steps in our automation service into the concept of a Task. A few examples of a Task would be:

  • detecting a document’s language,
  • translating a foreign language document,
  • processing OCR results into raw text,
  • detecting keywords inside text,
  • running machine learning inference on text.

Just reading this list gives one a feel for how each Task is dependent on a previous Task’s output to run. Being explicit about dependencies is core to workflow orchestration, so the first step in our Task concept was defining what inputs a given Task requires and what outputs it will produce. To demonstrate Task’s, we will develop a fake example Task called DocClassifyInference, the goal of which is to run ML inference to classify a given document. Imagine that our model uses both images of the raw pdf file and the text inside it to make predictions. Our Task, then, will require the decrypted PDF and the paginated text of the pdf in order to execute. Further, when it’s complete it will write a file to S3 containing its results. Thus, the start of our example Task might look like:

https://medium.com/media/094d252043626f462dee2692b54f2b29/href

DocClassifyInference subclasses S3Task, an abstract class that enforces defining a method to write to s3. S3Task itself is a subclass of the Task class which enforces that subclasses define input keys, output keys and an execute method. The keys are enforced in a Pipeline class:

https://medium.com/media/767e8d6e412c0ec6b472df79028c536d/href

This Pipeline will become the object that manages state as our Tasks execute. In our case we were not approaching memory limits so we decided to keep much of the Task state in-memory though this could easily be changed to always write to and read from storage. As a state manager, the Pipeline can also capture ceremony prior to executing any Tasks that downstream Tasks may require.

Continuing on with DocClassifyInference, as a subclass of the abstract class Task, DocClassifyInference will have to implement def execute as well (enforced by Task). This method will take a Pipeline and return a Pipeline. In essence, it receives the state manager, modifies the state and returns it so the next Task can operate on it. In our example case, execute will extract the decrypted pdf and paginated text so they can be used as inputs for a ML model to perform document classification. Let’s look at the entire stubbed out DocClassifyInference:

https://medium.com/media/e70499b33e3aa2979d5713f967406298/href

It’s easy to see how DocClassifyInference gets the Pipeline state, extracts what it needs, operates on that data, sets what it has declared it’s going to set and returns the Pipeline. This allows for an API like this:

https://medium.com/media/c6c6331180e2c768bebe8b6d3d93e156/href

Which of course was much cleaner than what we had previously. It also lends itself to writing easy, understandable unit tests per Task as well as adhering more closely to functional programming principles. So this solves our first goal of making the code cleaner and more easy to reason about. What about parallel processing?

Parallel Processing

Similar to Luigi and Apache Airflow, the goal of our workflow orchestration is to generate a topologically sorted Directed Acyclic Graph of Tasks. In short, having each Task explicitly define its required inputs and intended output allows the Tasks to be sorted for optimal execution. We no longer need to write the Tasks down in sequential order like the API described above, rather we can pass a Task Planner a list of Tasks and it can decide how to optimally execute them. What we’ll want then is a Task Planner that is passed a List of Tasks, sorts the Tasks topologically and returns a list where each member is a list that contains Tasks. Let’s take a look at what this might look like using some of our examples from above:

https://medium.com/media/0a663e1d21fad19f6742fcb8626491c2/href

Here I have retained our examples while adding two new Tasks: KeywordDetection and CreateCSVOutput. You can imagine these like matching keywords in the paginated text and modifying the results of RunDocInference & KeywordDetection to create a formatted CSVOutput. When the Task Planner receives this list, we’ll want it to topologically sort the tasks and output a data structure that looks like this:

https://medium.com/media/1f13a65d01989925558a170c8f56b694/href

In the above List, you can imagine each of its members is a ‘stage’ of execution. Each stage has one-to-many Tasks; in the case of one, execution occurs sequentially and in the case of many, execution occurs in parallel. In english, the expected_task_plan can described like so:

  • DecryptPDF depends on nothing and creates a consumable PDF,
  • PaginatedText depends on a consumable PDF and creates a list of strings
    - RunDocInference depends on both and classifies the document
    - KeywordDetection depends on paginated text and produces matches
  • CreateCSVOutput depends on doc classification and keyword detection and produces a formatted CSV of their outputs.

An example of the function that creates the expected_task_plan above might look like:

https://medium.com/media/3c129c3f6c9c4e794bec10497bf706a7/href

This function gets the list of Tasks, ensures that no two Task outputs have identical keys, adds the nodes to a sorter by interrogating the Task input_keys and output_keys and sorts them topologically. In our case the sorter comes from graphlib’s TopologicalSorter which is described here. Getting into what each of these functions are doing would take us too far afield so we will move on to executing a task plan.

With the expected_task_plan shown above, an execute_task_plan() function is straightforward:

https://medium.com/media/b734c2dfa055816d3da58282a3a85802/href

Here we iterate over the task list deciding between sequential execution or parallel execution. In the latter case, we utilize python’s threading.Thread library to create a thread per task and use idiomatic methods for starting and joining threads. Wait, then what is TaskThread?

In our case, we wanted to ensure that an exception in a child thread will always be raised to the calling thread so the calling thread can exit immediately. So we extended the threading.Thread class with our own class called TaskThread. Overriding threading.Thread’s .run() method is fairly common (so common that it’s suggested in run()’s comments); we overrode run() to set an instance variable carrying an exception’s content and then we check that variable at .join() time.

https://medium.com/media/32dad77b5be2c90b1dc3e9ef857987cc/href

The calling thread can now try/except at .join() time.

Conclusion

With these structures in place, the file containing the automation service’s primary functions was reduced from ~500 lines to ~90. Now when we create our threadpool to consume SQS messages, we get the Task plan like so task_plan = get_task_plan() and pass the task_plan into each thread. Once execution reaches the main function for performing document intelligence, what previously was a large section of difficult-to-read code now becomes:

https://medium.com/media/86e2f5c586197e6dd85c8f3c16d22409/href

The introduction of parallel processing of these Task’s shaved consistent time off of performing document intelligence (an average of about a minute). The real benefit of this change, however, will come in the future as we add more and more Tasks to the pipeline that can be processed in parallel.

While we’ve reduced the time-to-audit significantly from the former state-of-the-art, we are definitely not done. Features like the above will enable us to continue reducing this time while maintaining consistent processing times. We hope this blog helps you in your workflow orchestration research.

Lullabot: Bitmasks in JavaScript: A Computer Science Crash Course

3 months 2 weeks ago

One of the nice things about front-end web development as a career choice is that the software and coding languages are available on every modern machine. It doesn’t matter what operating system or how powerful your machine is. HTML, CSS, and JavaScript will run pretty well on it. This lets a lot of us in the industry bypass the need for formal education in computer science.

Unfortunately, this also has the side effect of leaving little gaps in our knowledge here and there, especially in strategies like bitmasking, which are seldom used in web development.

Specbee: Marketo Webhook Integration with Drupal: Sync Lead Data from Marketo to Drupal in Real-Time

3 months 2 weeks ago
Marketo Webhook Integration with Drupal: Sync Lead Data from Marketo to Drupal in Real-Time Shefali Shetty 09 Aug, 2022

When the Association of National Advertisers (ANA) names “Personalization” the “marketing word of the year”, you can probably feel comfortable that it is a strategy that’s here to stay. Personalized content adds a human touch to the customer experience, something that is priceless throughout their journey. This is proven by stats that suggest 90% of consumers find personalized content more appealing and get annoyed when it is not. 

Marketing automation software giant, Marketo, helps B2B and B2C organizations engage and nurture potential leads while enabling marketers to create personalized marketing campaigns around them. 

Combining the power of Marketo with a content management system like Drupal is one of the best ways to present a completely seamless digital experience to customers. 

With Drupal - Marketo integration modules like the Marketo MA, you can automate lead capturing, tracking, nurturing, personalization, analytics, and much more. Now your Drupal website is also connected to different third-party services that will often need updated lead data from Marketo. Enter, Webhooks. In one of our recent projects, we used Webhooks to get real-time data from Marketo so that the content can be more personalized to the customer when they log in. Read more to find out about the Drupal - Marketo integration and how to configure a Webhook to synchronize Marketo data with Drupal in real-time.

Setting up Marketo in Drupal

Before moving on with setting up the Drupal - Marketo integration, note that this process assumes that you already have set up your Marketo account and you know how the platform works.

Installing the Marketo MA Drupal Module

On your Drupal admin setup, let’s go ahead and install the Marketo MA module from here. Next, go to Extend and enable the following modules (as shown in the below screengrab):

  • Marketo MA User
  • Marketo MA Webform
  • Marketo MA
API Configuration

Now, let’s activate your Marketo integration by entering the Marketo account ID and other lead capturing details. Here, we are going to use the REST API method to track lead data instead of Munchkin JavaScript API. So, go ahead and enter the REST API configuration settings like the Client ID and the Client Secret.

Field Definition

Here’s where you configure and map your user and Webform fields to the fields defined in your Marketo account (as shown in the below screengrab).

  User Settings

In this section, you can enable a trigger to update the lead in Marketo during events like a User login, registration/creation, and an update to the user profile. You can also choose the User fields that should trigger the update and map it to the Marketo field.

  Adding the Webform Handler

Now select the Marketo MA webform handler to make sure that the lead is captured via webforms and sent to Marketo.

 

This setup will now let you add lead capturing, tracking, and nurturing capabilities on your Drupal site. You are now ready to send leads from Drupal to your Marketo platform.

How to Configure a Webhook to get Updated Lead Data from Marketo to Drupal

Your leads can come in from different sources. Several of your leads come in through your website's webform, while others may be entered directly into Marketo's dashboard through different marketing channels. 

Sometimes, the user data that is captured and sent over from your Drupal site might get updated on the Marketo dashboard. What happens when you need real-time updated data from Marketo to personalize Drupal content for that user?

The Use Case

Recently, our client’s Drupal website required us to create a Webhook for their content personalization needs. They have a single sign-on system where their users can log in once and can access multiple site areas like events, member login, and shopping. Now after logging in, the content is personalized on the Drupal website based on content segmentations like demographics, job levels, etc. This needed our Drupal site to have updated user data that is synchronized in real-time with their Marketo system.

One not-very-feasible solution would be to make an API call to fetch get lead data from Marketo on user sign-in. However, not only is this method going to slow down the process, but it also proves more expensive as API requests are billed.

The Solution - Webhooks

Webhooks are basically API requests that are triggered by specific events. Marketo lets you register webhooks to connect to different third-party applications. For this use case, we configured a webhook to get real-time data from Marketo into the Drupal website. Let’s dive into the steps taken to implement webhooks for the Drupal Marketo integration.

Step 1: Create a custom module and define a route for API

First, you need to enable the HTTP Basic Authentication module in your Drupal setup.

marketo_webhook.routing.yml

marketo_webhook.webhook:   path: '/webhooks/marketo'   options:     _auth: [ 'basic_auth' ]   requirements:     _user_is_logged_in: 'TRUE'   defaults:     _controller: '\Drupal\marketo_webhook\Controller\MarketoWebhookController::getMarketoLeads'   methods: [POST] Step 2: Create a controller for the API and store the data in custom fields <?php namespace Drupal\marketo_webhook\Controller; use Drupal\Core\Controller\ControllerBase; use Drupal\Core\Entity\EntityTypeManagerInterface; use Symfony\Component\DependencyInjection\ContainerInterface; use Symfony\Component\HttpFoundation\JsonResponse; use Symfony\Component\HttpFoundation\Request; /** * Controller for Marketo Webhook. */ class MarketoWebhookController extends ControllerBase {  /**   * The entity type manager.   *   * @var \Drupal\Core\Entity\EntityTypeManagerInterface;   */  protected $entityTypeManager;  public function __construct(EntityTypeManagerInterface $entityTypeManager) {    $this->entityTypeManager = $entityTypeManager;  }  /**   * {@inheritdoc}   */  public static function create(ContainerInterface $container) {    return new static(      $container->get('entity_type.manager')    );  }  /**   * Update user marketo fields.   */  public function getMarketoLeads(Request $request) {    $payload = json_decode($request->getContent(), TRUE);    $payload_log = implode(',', $payload);    \Drupal::logger('marketo_webhook')->notice($payload_log);    if($payload){      if($payload['mail']){        $users = $this->entityTypeManager->getStorage('user')          ->loadByProperties(['mail' => $payload['mail']]);        $user = reset($users);        if ($user) {          if($payload['field_job_function'] != 'NA'){            $user->set('field_job_function',$payload['field_job_function']);          }          $user->save();          return JsonResponse::create('Success', 200);        }      }    }    return JsonResponse::create('Success', 400);  } } Step 3: Create the Webhook and Marketo Integration

But first, you will need to register the Webhook. To register the Webhook on Marketo, let’s first hop on to our Marketo dashboard and click on the Webhooks option under the Admin >> Integration menu (as shown in the below screengrab).

 

Next, create a New Webhook which will open up a dialog box where you can enter details like the Webhook Name, Description, URL, Request Type, Template, etc.

 

Give a name to the Webhook and an easily understandable description. Enter the URL to submit the web service request.

For example, here:
https://www.specbee.com/webhooks/marketo is the API endpoint for our webhook
Add the Drupal username and password for basic authentication like mentioned below:
https://username:[email protected]/webhooks/marketo

Click on the Insert Token button next to Template to add fields of the Marketo object that you want to pass with the request.
For example: "field_job_function": "{{lead.Job Function:default=NA}}". Set default value to any key of your choice. ‘NA’ in our case. This will return NA if there is no data.

  Step 4: Create a Smart Campaign

To create the Webhook Marketo Integration, you will now need to set up a Smart Campaign. You can define your own Smart campaigns in Marketo that will run Marketo programs like calling a Webhook, sending out emails after a certain event, etc. A Smart campaign configuration has three parts: Smart List, Flow, and Schedule. You will need to add the trigger to the Webhook under Smart List.

  • Under Marketing Activities and within your program, create a new Smart campaign. 
  • Give the Smart Campaign a name and a description. Here we have called it Drupal Integration.
  • Under Smart List, you will find all available Triggers. Drag and drop the triggers you need into the Smart List. Here, we have selected the Person is Created trigger but this will trigger only when a new lead is created. To solve this, let’s go ahead and add another trigger for Data value changes so that it gets fired when there is an update in the lead data.
  • We have selected the Job Function and Job Level attributes under Person to trigger the webhook (as shown in the below screengrab).

 

  • Now, time to call the Webhook. Click on Flow and select the Call Webhook flow action on the right pane and drag it to the Flow. Select the name of the Webhook you created.

 

  • Now that you have created the campaign to call the Webhook, let’s schedule it.

 

  • In the Smart Campaign Settings click on the Edit button to set how often you want the campaign to run. For our use case, we have selected “every time” as we wanted the webhook to fire up every time a lead data gets updated. Save that setting and click on ACTIVATE.
  Step 5: Test it out!

Your campaign is now ready to test. You will be able to see all the activities, that is, the number of calls to the Webhook and other details under the Results tab of the Smart campaign.

So ideally, when you create a new lead (Person) or update the Job level or Job Function field of an existing lead, it should call the Webhook and get the lead updated in your Drupal website’s database as well.

This article would not have been possible without Prashanth’s help! Thank you!

Conclusion

Marketing automation platforms like Marketo can be a valuable addition to any organization's marketing strategy to help engage, nurture, and ultimately convert leads. The use of Drupal as a content management system streamlines these activities. In this article, along with showing you how to integrate Marketo with Drupal, we have also covered how to configure webhooks that can let you get updated lead data from Marketo to Drupal. Need help customizing Drupal integrations with Marketo or any other third-party application? We’d be happy to help!

Author: Shefali Shetty

​​Meet Shefali Shetty, Director of Marketing at Specbee. An enthusiast for Drupal, she enjoys exploring and writing about the powerhouse. While not working or actively contributing back to the Drupal project, you can find her watching YouTube videos trying to learn to play the Ukulele :)

Drupal Drupal Development Drupal Integration Drupal 9 Drupal Planet Drupal Module Subscribe to our Newsletter Now Subscribe Leave this field blank

Leave us a Comment

  Recent Blogs Image Marketo Webhook Integration with Drupal: Sync Lead Data from Marketo to Drupal in Real-Time Image Building component-based websites on Drupal using Acquia Site Studio Image Setting up Responsive Images in Drupal 9 - A Step-by-Step Guide Want to extract the maximum out of Drupal? TALK TO US Featured Success Stories

Upgrading and consolidating multiple web properties to offer a coherent digital experience for Physicians Insurance

Upgrading the web presence of IEEE Information Theory Society, the most trusted voice for advanced technology

Great Southern Homes, one of the fastest growing home builders in the United States, sees greater results with Drupal 9

View all Case Studies

Talking Drupal: Talking Drupal #359 - Contribution Events

3 months 2 weeks ago

Today we are talking about Contribution Events.

www.talkingDrupal.com/359

Topics
  • What are contribution events
  • What is the contribution event
  • What are the key goals
  • Can you give us a quick overview of how you started teh community initiative
  • Why did each of you feel this was important
  • How did you get involved
  • What was involved in the first event
  • What were lessons learned
  • What were the successes of the first event
  • How can someone have a contribution event
  • Are there differences in having events centered on various areas
  • What are the most important resources
  • How can someone get involved
Resources Guests

Kristen Pol - www.drupal.org/u/kristen-pol @kristen_pol Surabhi Gokte - www.drupal.org/u/surabhi-gokte @SurabhiGokte

Hosts

Nic Laflin - www.nLighteneddevelopment.com @nicxvan John Picozzi - www.epam.com @johnpicozzi Ryan Price - ryanpricemedia.com - @liberatr

MOTW

Anonymous Login This is a very simple, lightweight module that will redirect anonymous users to the login page whenever they reach any admin-specified page paths, and will direct them back to the originally-requested page after successful login.

Matt Glaman: ReactPHP for Drupal deployments and workers

3 months 2 weeks ago

I recently held a live stream where I walked through the continuous integration and deployment (CI/CD) of a Drupal project to DigitalOcean's App Platform and other CI/CD items. App Platform has its quirks, but it's simple to build an application with various components. My project, Whiskey Dex, builds a Docker image that pushes to my container registry and then updates my App Platform manifest to use the new image tag, triggering a deployment. 

Checked
27 minutes 11 seconds ago
Drupal.org - aggregated feeds in category Planet Drupal
Subscribe to Drupal Planet feed