Créer un connecteur KAFKA CONNECT

Après avoir essayé le connecteur de confluent kafka-connect-jdbc, je me suis aperçu qu’il y avait encore quelques bugs qui le rendaient inutilisable dans mon environnement ( notamment avec les champs numériques).

J’ai donc décidé de créer mon propre connecteur. Certes celui-ci sera moins générique mais il correspondra à mon besoin ( ou pas …).

API utilisées

  • JAVA8
  • API KAFKA CONNECT

Configuration maven

Cette configuration me permet de créer un fat jar avec les dépendances nécessaires à la bonne exécution du connecteur.

Voici mon fichier pom.xml

 

<project xmlns="http://maven.apache.org/POM/4.0.0"?utm_source=rss&utm_medium=rss xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"?utm_source=rss&utm_medium=rss
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0?utm_source=rss&utm_medium=rss http://maven.apache.org/xsd/maven-4.0.0.xsd">?utm_source=rss&utm_medium=rss
    <modelVersion>4.0.0</modelVersion>

    <groupId>info.touret.myconnect</groupId>
    <artifactId>myconnect</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>myconnect</name>
    <url>http://maven.apache.org</url>?utm_source=rss&utm_medium=rss

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <kafka.version>0.10.0.0</kafka.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>LATEST</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.mockito</groupId>
            <artifactId>mockito-all</artifactId>
            <version>LATEST</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.powermock</groupId>
            <artifactId>powermock-api-mockito</artifactId>
            <version>1.6.5</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.powermock</groupId>
            <artifactId>powermock-module-junit4</artifactId>
            <version>1.6.5</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.oracle.weblogic</groupId>
            <artifactId>ojdbc7</artifactId>
            <version>12.1.3-0-0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-runtime</artifactId>
            <version>${kafka.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-api</artifactId>
            <version>${kafka.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.21</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.21</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.typesafe</groupId>
            <artifactId>config</artifactId>
            <version>1.3.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>1.7.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <shadedArtifactAttached>true</shadedArtifactAttached>
                            <shadedClassifierName>allinone</shadedClassifierName>
                            <artifactSet>
                                <includes>
                                    <include>*:*</include>
                                </includes>
                            </artifactSet>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <manifestEntries>
                                        <Main-Class></Main-Class>
                                    </manifestEntries>
                                </transformer>
                            </transformers>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>


        </plugins>
    </build>
</project>

Développement

Comme le dit le guide de développement, il faut créer a minima une classe héritant de la classe SourceConnector et une classe héritant de SourceTask.

MySourceConnector

Cette classe permet la récupération de la configuration du connecteur.

public class MySourceConnector extends SourceConnector {

    public static final String TOPIC_CONFIG = "topic";

    private static Logger logger = LoggerFactory.getLogger(MySourceConnector.class);

    public String version() {
        return AppInfoParser.getVersion();
    }

    public void start(Map&lt;String, String&gt; map) {
    }

    public Class&lt;? extends Task&gt; taskClass() {
        return MySourceTask.class;
    }

    public List&lt;Map&lt;String, String&gt;&gt; taskConfigs(int maxTasks) {
        ArrayList&lt;Map&lt;String, String&gt;&gt; configs = new ArrayList&lt;&gt;();
        Map&lt;String, String&gt; map = new HashMap&lt;&gt;();
        map.put(TOPIC_CONFIG, "montopic");
        configs.add(map);
        return configs;
    }

    public void stop() {

    }

    public ConfigDef config() {
        return new ConfigDef().define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Kafka Topic");
    }
}

MySourceTask

Cette classe gère l’exécution de l’extraction et chargement dans KAFKA. Elle permet dans la méthode start() de démarrer le connecteur et de lancer les ressources (connexions JDBC).

Comme le connecteur standard, je m’appuie sur un champ de type Timestamp. celui -ci me permet de créer un offset et de faire un parcours incrémental de mes résultats .

public class MySourceTask extends SourceTask {

    private String topic = null;
    private Logger logger = LoggerFactory.getLogger(MySourceTask.class);
    private Connection connection;
    private PreparedStatement preparedStatement;

    public void start(Map<String, String> props) {
        topic = props.get(MySourceConnector.TOPIC_CONFIG);
        logger.warn("Starting Task ");
        try {
            connection = createConnection(); // connexion jdbc classique
            preparedStatement = connection.prepareStatement("select *from mytable");
        } catch (SQLException e) {
            logger.error(e.getMessage(), e);
            throw new RuntimeException(e);
        }
    }

    public List<SourceRecord> poll() throws InterruptedException {
        logger.warn("Polling");
        List<SourceRecord> results = new LinkedList<>();
        try {
// recuperation de la partition
            Map sourcePartition = Collections.singletonMap("table", "mytable"));
// recuperation du dernier offset
            final Timestamp lastRecordedOffset = getLastRecordedOffset(sourcePartition);
            preparedStatement.setTimestamp(1,lastRecordedOffset);
            ResultSet resultSet = preparedStatement.executeQuery();
            while (resultSet.next()) {
// recuperation du champ de type timestamp permettant le chargement incremental
                Timestamp timestamp = resultSet.getTimestamp(18);
// extraction de la ligne (oui je sais c'est bourrin)
                String line = new StringBuilder().append(resultSet.getString(1)).append(resultSet.getString(2)).append(timestamp).toString();
je cree un nouveau offset 
                Map sourceOffSet = Collections.singletonMap("position", new Long(timestamp.getTime()));
                results.add(new SourceRecord(sourcePartition, sourceOffSet, topic, Schema.STRING_SCHEMA, line));
            }
        } catch (SQLException e) {
            throw new InterruptedException(e.getMessage());
        }
        return results;
    }
/**
 * extraction du timestamp du dernier champ extrait. Par defaut, je positionne l'EPOCH
 */
    private Timestamp getLastRecordedOffset(Map<String,Object> partition) {
        Map<String,Object> offset = context.offsetStorageReader().offset(partition);
        Timestamp lastRecordedOffset = Timestamp.from(EPOCH);
        if(offset !=null){
            lastRecordedOffset = new Timestamp((Long)offset.getOrDefault("position",Timestamp.from(EPOCH)));
        }
        return lastRecordedOffset;
    }

    public void stop() {
        try {
            if (preparedStatement != null) {
                preparedStatement.close();
            }
            if (preparedStatement != null) {
                connection.close();
            }
        } catch (SQLException e) {
            logger.error(e.getMessage(), e);
        }
    }

    public String version() {
        return new MySourceConnector().version();
    }
}

Configuration nécessaire à l’exécution du plugin

Il faudra créer également un fichier properties contenant les informations suivantes :

name=my-connector-source
connector.class=info.touret.MySourceConnector

Exécution

De la même manière que pour le connecteur standard…

Conclusion

Voila le squelette de mon connecteur crée. Pour l’instant les données sont sérialisées de manière un peu brutale. La prochaine étape sera de les mettre au format JSON. La suite dans un prochain numéro…

Vus : 598
Publié par Littlewing : 368